from django.conf import settings from django.contrib.auth import login, authenticate, get_user_model from django.http.response import JsonResponse from django.views.decorators.csrf import ensure_csrf_cookie, csrf_exempt, csrf_protect from django.utils.decorators import method_decorator from django.core.exceptions import ValidationError from django.middleware.csrf import get_token from rest_framework.views import APIView from rest_framework.parsers import JSONParser from rest_framework import status from Auth.pagination import CustomProfilesPagination from drf_yasg.utils import swagger_auto_schema from drf_yasg import openapi from datetime import datetime, timedelta import jwt from jwt.exceptions import ExpiredSignatureError, InvalidTokenError import json from . import validator from .models import Profile, ProfileRole from rest_framework.decorators import action, api_view from django.db.models import Q from Auth.serializers import ProfileSerializer, ProfileRoleSerializer from Subscriptions.models import RegistrationForm, Guardian import N3wtSchool.mailManager as mailer import Subscriptions.util as util import logging from N3wtSchool import bdd, error, settings from rest_framework_simplejwt.authentication import JWTAuthentication logger = logging.getLogger("AuthViews") @swagger_auto_schema( method='get', operation_description="Obtenir un token CSRF", responses={200: openapi.Response('Token CSRF', schema=openapi.Schema(type=openapi.TYPE_OBJECT, properties={ 'csrfToken': openapi.Schema(type=openapi.TYPE_STRING) }))} ) @api_view(['GET']) def csrf(request): token = get_token(request) return JsonResponse({'csrfToken': token}) class SessionView(APIView): @swagger_auto_schema( operation_description="Vérifier une session utilisateur", manual_parameters=[openapi.Parameter('Authorization', openapi.IN_HEADER, type=openapi.TYPE_STRING, description='Bearer token')], responses={ 200: openapi.Response('Session valide', schema=openapi.Schema(type=openapi.TYPE_OBJECT, properties={ 'user': openapi.Schema(type=openapi.TYPE_OBJECT, properties={ 'id': openapi.Schema(type=openapi.TYPE_INTEGER), 'email': openapi.Schema(type=openapi.TYPE_STRING), 'roleIndexLoginDefault': openapi.Schema(type=openapi.TYPE_INTEGER), 'roles': openapi.Schema(type=openapi.TYPE_ARRAY, items=openapi.Items(type=openapi.TYPE_OBJECT, properties={ 'role_type': openapi.Schema(type=openapi.TYPE_STRING), 'establishment': openapi.Schema(type=openapi.TYPE_STRING) })) }) })), 401: openapi.Response('Session invalide') } ) def get(self, request): token = request.META.get('HTTP_AUTHORIZATION', '').split('Bearer ')[-1] try: decoded_token = jwt.decode(token, settings.SECRET_KEY, algorithms=['HS256']) userid = decoded_token.get('user_id') user = Profile.objects.get(id=userid) roles = ProfileRole.objects.filter(profile=user).values('role_type', 'establishment__name') response_data = { 'user': { 'id': user.id, 'email': user.email, 'roleIndexLoginDefault': user.roleIndexLoginDefault, 'roles': list(roles) } } return JsonResponse(response_data, status=status.HTTP_200_OK) except jwt.ExpiredSignatureError: return JsonResponse({"error": "Token has expired"}, status=status.HTTP_401_UNAUTHORIZED) except jwt.InvalidTokenError: return JsonResponse({"error": "Invalid token"}, status=status.HTTP_401_UNAUTHORIZED) class ProfileView(APIView): @swagger_auto_schema( operation_description="Obtenir la liste des profils", responses={200: ProfileSerializer(many=True)} ) def get(self, request): profilsList = bdd.getAllObjects(_objectName=Profile) profils_serializer = ProfileSerializer(profilsList, many=True) return JsonResponse(profils_serializer.data, safe=False) @swagger_auto_schema( operation_description="Créer un nouveau profil", request_body=ProfileSerializer, responses={ 200: ProfileSerializer, 400: 'Données invalides' } ) def post(self, request): profil_data = JSONParser().parse(request) profil_serializer = ProfileSerializer(data=profil_data) if profil_serializer.is_valid(): profil = profil_serializer.save() return JsonResponse(profil_serializer.data, safe=False) return JsonResponse(profil_serializer.errors, safe=False, status=status.HTTP_400_BAD_REQUEST) @method_decorator(csrf_protect, name='dispatch') @method_decorator(ensure_csrf_cookie, name='dispatch') class ProfileSimpleView(APIView): @swagger_auto_schema( operation_description="Obtenir un profil par son ID", responses={200: ProfileSerializer} ) def get(self, request, id): profil = bdd.getObject(Profile, "id", id) profil_serializer = ProfileSerializer(profil) return JsonResponse(profil_serializer.data, safe=False) @swagger_auto_schema( operation_description="Mettre à jour un profil", request_body=ProfileSerializer, responses={ 200: 'Mise à jour réussie', 400: 'Données invalides' } ) def put(self, request, id): data = JSONParser().parse(request) profil = Profile.objects.get(id=id) profil_serializer = ProfileSerializer(profil, data=data) if profil_serializer.is_valid(): profil_serializer.save() return JsonResponse(profil_serializer.data, safe=False) return JsonResponse(profil_serializer.errors, safe=False, status=status.HTTP_400_BAD_REQUEST) @swagger_auto_schema( operation_description="Supprimer un profil", responses={200: 'Suppression réussie'} ) def delete(self, request, id): return bdd.delete_object(Profile, id) @method_decorator(csrf_exempt, name='dispatch') class LoginView(APIView): @swagger_auto_schema( operation_description="Connexion utilisateur", request_body=openapi.Schema( type=openapi.TYPE_OBJECT, required=['email', 'password'], properties={ 'email': openapi.Schema(type=openapi.TYPE_STRING), 'password': openapi.Schema(type=openapi.TYPE_STRING) } ), responses={ 200: openapi.Response('Connexion réussie', schema=openapi.Schema( type=openapi.TYPE_OBJECT, properties={ 'token': openapi.Schema(type=openapi.TYPE_STRING), 'refresh': openapi.Schema(type=openapi.TYPE_STRING) } )), 400: openapi.Response('Connexion échouée', schema=openapi.Schema( type=openapi.TYPE_OBJECT, properties={ 'errorFields': openapi.Schema(type=openapi.TYPE_OBJECT), 'errorMessage': openapi.Schema(type=openapi.TYPE_STRING) } )) } ) def post(self, request): data = JSONParser().parse(request) validatorAuthentication = validator.ValidatorAuthentication(data=data) retour = error.returnMessage[error.WRONG_ID] validationOk, errorFields = validatorAuthentication.validate() user = None if validationOk: user = authenticate( email=data.get('email'), password=data.get('password'), ) if user is not None: # Vérifier si l'utilisateur a un role actif has_active_role = ProfileRole.objects.filter(profile=user, is_active=True).first() if not has_active_role: return JsonResponse({"errorMessage": "Profil inactif"}, status=status.HTTP_401_UNAUTHORIZED) login(request, user) user.save() retour = '' access_token, refresh_token = makeToken(user) return JsonResponse({ 'token': access_token, 'refresh': refresh_token }, safe=False) else: retour = error.returnMessage[error.WRONG_ID] return JsonResponse({ 'errorFields': errorFields, 'errorMessage': retour, }, safe=False, status=status.HTTP_400_BAD_REQUEST) def makeToken(user): """ Fonction pour créer un token JWT pour l'utilisateur donné. """ try: # Récupérer tous les rôles de l'utilisateur actifs roles = ProfileRole.objects.filter(profile=user, is_active=True).values('role_type', 'establishment__id', 'establishment__name', 'establishment__evaluation_frequency', 'establishment__total_capacity', 'establishment__api_docuseal') # Générer le JWT avec la bonne syntaxe datetime access_payload = { 'user_id': user.id, 'email': user.email, 'roleIndexLoginDefault':user.roleIndexLoginDefault, 'roles': list(roles), 'type': 'access', 'exp': datetime.utcnow() + settings.SIMPLE_JWT['ACCESS_TOKEN_LIFETIME'], 'iat': datetime.utcnow(), } access_token = jwt.encode(access_payload, settings.SIMPLE_JWT['SIGNING_KEY'], algorithm=settings.SIMPLE_JWT['ALGORITHM']) # Générer le Refresh Token (exp: 7 jours) refresh_payload = { 'user_id': user.id, 'type': 'refresh', 'exp': datetime.utcnow() + settings.SIMPLE_JWT['REFRESH_TOKEN_LIFETIME'], 'iat': datetime.utcnow(), } refresh_token = jwt.encode(refresh_payload, settings.SIMPLE_JWT['SIGNING_KEY'], algorithm=settings.SIMPLE_JWT['ALGORITHM']) return access_token, refresh_token except Exception as e: logger.error(f"Erreur lors de la création du token: {str(e)}") return None class RefreshJWTView(APIView): @swagger_auto_schema( operation_description="Rafraîchir le token d'accès", request_body=openapi.Schema( type=openapi.TYPE_OBJECT, required=['refresh'], properties={ 'refresh': openapi.Schema(type=openapi.TYPE_STRING) } ), responses={ 200: openapi.Response('Token rafraîchi avec succès', schema=openapi.Schema( type=openapi.TYPE_OBJECT, properties={ 'token': openapi.Schema(type=openapi.TYPE_STRING), 'refresh': openapi.Schema(type=openapi.TYPE_STRING), } )), 400: openapi.Response('Échec du rafraîchissement', schema=openapi.Schema( type=openapi.TYPE_OBJECT, properties={ 'errorMessage': openapi.Schema(type=openapi.TYPE_STRING) } )) } ) @method_decorator(csrf_exempt, name='dispatch') def post(self, request): data = JSONParser().parse(request) refresh_token = data.get("refresh") logger.info(f"Token reçu: {refresh_token[:20]}...") # Ne pas logger le token complet pour la sécurité if not refresh_token: return JsonResponse({'errorMessage': 'Refresh token manquant'}, status=400) try: # Décoder le Refresh Token logger.info("Tentative de décodage du token") logger.info(f"Algorithme utilisé: {settings.SIMPLE_JWT['ALGORITHM']}") # Vérifier le format du token avant décodage token_parts = refresh_token.split('.') if len(token_parts) != 3: logger.error("Format de token invalide - pas 3 parties") return JsonResponse({'errorMessage': 'Format de token invalide'}, status=400) payload = jwt.decode( refresh_token, settings.SIMPLE_JWT['SIGNING_KEY'], algorithms=[settings.SIMPLE_JWT['ALGORITHM']] # Noter le passage en liste ) logger.info(f"Token décodé avec succès. Type: {payload.get('type')}") # Vérifier s'il s'agit bien d'un Refresh Token if payload.get('type') != 'refresh': return JsonResponse({'errorMessage': 'Token invalide'}, status=400) # Récupérer les informations utilisateur user = Profile.objects.get(id=payload['user_id']) if not user: return JsonResponse({'errorMessage': 'Utilisateur non trouvé'}, status=404) new_access_token, new_refresh_token = makeToken(user) return JsonResponse({'token': new_access_token, 'refresh': new_refresh_token}, status=200) except ExpiredSignatureError as e: logger.error(f"Token expiré: {str(e)}") return JsonResponse({'errorMessage': 'Refresh token expiré'}, status=400) except InvalidTokenError as e: logger.error(f"Token invalide: {str(e)}") return JsonResponse({'errorMessage': f'Token invalide: {str(e)}'}, status=400) except Exception as e: logger.error(f"Erreur inattendue: {str(e)}") return JsonResponse({'errorMessage': f'Erreur inattendue: {str(e)}'}, status=400) @method_decorator(csrf_protect, name='dispatch') @method_decorator(ensure_csrf_cookie, name='dispatch') class SubscribeView(APIView): @swagger_auto_schema( operation_description="Inscription utilisateur", manual_parameters=[ openapi.Parameter( 'establishment_id', openapi.IN_QUERY, description="ID de l'établissement", type=openapi.TYPE_INTEGER, required=True ) ], request_body=openapi.Schema( type=openapi.TYPE_OBJECT, required=['email', 'password1', 'password2'], properties={ 'email': openapi.Schema(type=openapi.TYPE_STRING), 'password1': openapi.Schema(type=openapi.TYPE_STRING), 'password2': openapi.Schema(type=openapi.TYPE_STRING) } ), responses={ 200: openapi.Response('Inscription réussie', schema=openapi.Schema( type=openapi.TYPE_OBJECT, properties={ 'message': openapi.Schema(type=openapi.TYPE_STRING), 'errorMessage': openapi.Schema(type=openapi.TYPE_STRING), 'errorFields': openapi.Schema(type=openapi.TYPE_OBJECT), 'id': openapi.Schema(type=openapi.TYPE_INTEGER) } )) } ) def post(self, request): retourErreur = '' retour = '' newProfilConnection = JSONParser().parse(request) establishment_id = newProfilConnection['establishment_id'] validatorSubscription = validator.ValidatorSubscription(data=newProfilConnection) validationOk, errorFields = validatorSubscription.validate() if validationOk: # On vérifie que l'email existe : si ce n'est pas le cas, on retourne une erreur profil = bdd.getProfile(Profile.objects.all(), newProfilConnection.get('email')) if profil is None: retourErreur = error.returnMessage[error.PROFIL_NOT_EXISTS] else: # Vérifier si le profil a déjà un rôle actif pour l'établissement donné active_roles = ProfileRole.objects.filter(profile=profil, establishment=establishment_id, is_active=True) if active_roles.exists(): retourErreur = error.returnMessage[error.PROFIL_ACTIVE] return JsonResponse({'message': retour, 'errorMessage': retourErreur, "errorFields": errorFields, "id": profil.id}, safe=False) else: try: profil.set_password(newProfilConnection.get('password1')) profil.full_clean() profil.save() # Récupérer le ProfileRole existant pour l'établissement et le profil profile_role = ProfileRole.objects.filter(profile=profil, establishment=establishment_id).first() if profile_role: profile_role.is_active = True profile_role.save() else: # Si aucun ProfileRole n'existe, en créer un nouveau role_data = { 'profile': profil.id, 'establishment': establishment_id, 'is_active': True } role_serializer = ProfileRoleSerializer(data=role_data) if role_serializer.is_valid(): role_serializer.save() else: return JsonResponse(role_serializer.errors, safe=False, status=status.HTTP_400_BAD_REQUEST) retour = error.returnMessage[error.MESSAGE_ACTIVATION_PROFILE] retourErreur = '' return JsonResponse({'message': retour, 'errorMessage': retourErreur, "errorFields": errorFields, "id": profil.id}, safe=False) except ValidationError as e: retourErreur = error.returnMessage[error.WRONG_MAIL_FORMAT] return JsonResponse({'message': retour, 'errorMessage': retourErreur, "errorFields": errorFields}, safe=False) return JsonResponse({'message': retour, 'errorMessage': retourErreur, "errorFields": errorFields, "id": -1}, safe=False) @method_decorator(csrf_protect, name='dispatch') @method_decorator(ensure_csrf_cookie, name='dispatch') class NewPasswordView(APIView): @swagger_auto_schema( operation_description="Demande de nouveau mot de passe", request_body=openapi.Schema( type=openapi.TYPE_OBJECT, required=['email'], properties={ 'email': openapi.Schema(type=openapi.TYPE_STRING) } ), responses={ 200: openapi.Response('Demande réussie', schema=openapi.Schema( type=openapi.TYPE_OBJECT, properties={ 'message': openapi.Schema(type=openapi.TYPE_STRING), 'errorMessage': openapi.Schema(type=openapi.TYPE_STRING), 'errorFields': openapi.Schema(type=openapi.TYPE_OBJECT) } )) } ) def post(self, request): retourErreur = '' retour = '' newProfilConnection = JSONParser().parse(request) validatorNewPassword = validator.ValidatorNewPassword(data=newProfilConnection) validationOk, errorFields = validatorNewPassword.validate() if validationOk: profil = bdd.getProfile(Profile.objects.all(), newProfilConnection.get('email')) if profil is None: retourErreur = error.returnMessage[error.PROFIL_NOT_EXISTS] else: try: # Génération d'une URL provisoire pour modifier le mot de passe profil.code = util.genereRandomCode(12) profil.datePeremption = util.calculeDatePeremption(util._now(), settings.EXPIRATION_URL_NB_DAYS) profil.save() retourErreur = '' retour = error.returnMessage[error.MESSAGE_REINIT_PASSWORD] % (newProfilConnection.get('email')) mailer.envoieReinitMotDePasse(newProfilConnection.get('email'), profil.code) except ValidationError as e: retourErreur = error.returnMessage[error.WRONG_MAIL_FORMAT] return JsonResponse({'message': retour, 'errorMessage': retourErreur, "errorFields": errorFields}, safe=False) return JsonResponse({'message': retour, 'errorMessage': retourErreur, "errorFields": errorFields}, safe=False) @method_decorator(csrf_protect, name='dispatch') @method_decorator(ensure_csrf_cookie, name='dispatch') class ResetPasswordView(APIView): @swagger_auto_schema( operation_description="Réinitialisation du mot de passe", request_body=openapi.Schema( type=openapi.TYPE_OBJECT, required=['password1', 'password2'], properties={ 'password1': openapi.Schema(type=openapi.TYPE_STRING), 'password2': openapi.Schema(type=openapi.TYPE_STRING) } ), responses={ 200: openapi.Response('Réinitialisation réussie', schema=openapi.Schema( type=openapi.TYPE_OBJECT, properties={ 'message': openapi.Schema(type=openapi.TYPE_STRING), 'errorMessage': openapi.Schema(type=openapi.TYPE_STRING), 'errorFields': openapi.Schema(type=openapi.TYPE_OBJECT) } )) } ) def post(self, request, code): retourErreur = '' retour = '' newProfilConnection = JSONParser().parse(request) validatorResetPassword = validator.ValidatorResetPassword(data=newProfilConnection) validationOk, errorFields = validatorResetPassword.validate() profil = bdd.getObject(Profile, "code", code) if profil: if datetime.strptime(util.convertToStr(util._now(), '%d-%m-%Y %H:%M'), '%d-%m-%Y %H:%M') > datetime.strptime(profil.datePeremption, '%d-%m-%Y %H:%M'): retourErreur = error.returnMessage[error.EXPIRED_URL] elif validationOk: retour = error.returnMessage[error.PASSWORD_CHANGED] profil.set_password(newProfilConnection.get('password1')) profil.code = '' profil.datePeremption = '' profil.save() retourErreur = '' return JsonResponse({'message': retour, "errorMessage": retourErreur, "errorFields": errorFields}, safe=False) class ProfileRoleView(APIView): pagination_class = CustomProfilesPagination @swagger_auto_schema( operation_description="Obtenir la liste des profile_roles", responses={200: ProfileRoleSerializer(many=True)} ) def get(self, request): filter = request.GET.get('filter', '').strip() page_size = request.GET.get('page_size', None) establishment_id = request.GET.get('establishment_id', None) # Gestion du page_size if page_size is not None: try: page_size = int(page_size) except ValueError: page_size = settings.NB_RESULT_PROFILES_PER_PAGE if establishment_id is None: return JsonResponse({'error': 'establishment_id est requis'}, safe=False, status=status.HTTP_400_BAD_REQUEST) # Récupérer les ProfileRole en fonction du filtre profiles_roles_List = ProfileRole.objects.filter(establishment_id=establishment_id) if filter == 'parents': profiles_roles_List = profiles_roles_List.filter(role_type=ProfileRole.RoleType.PROFIL_PARENT) elif filter == 'school': profiles_roles_List = profiles_roles_List.filter( Q(role_type=ProfileRole.RoleType.PROFIL_ECOLE) | Q(role_type=ProfileRole.RoleType.PROFIL_ADMIN) ) else: return JsonResponse({'error': 'Filtre invalide'}, safe=False, status=status.HTTP_400_BAD_REQUEST) # Trier les résultats par date de mise à jour profiles_roles_List = profiles_roles_List.distinct().order_by('-updated_date') if not profiles_roles_List: return JsonResponse({'error': 'aucune donnée trouvée', 'count': 0}, safe=False) # Pagination paginator = self.pagination_class() page = paginator.paginate_queryset(profiles_roles_List, request) if page is not None: profile_roles_serializer = ProfileRoleSerializer(page, many=True) response_data = paginator.get_paginated_response(profile_roles_serializer.data) return JsonResponse(response_data, safe=False) return JsonResponse({'error': 'aucune donnée trouvée', 'count': 0}, safe=False) @swagger_auto_schema( operation_description="Créer un nouveau profile_role", request_body=ProfileRoleSerializer, responses={ 200: ProfileRoleSerializer, 400: 'Données invalides' } ) def post(self, request): profile_role_data = JSONParser().parse(request) profile_role_serializer = ProfileRoleSerializer(data=profile_role_data) if profile_role_serializer.is_valid(): profile_role = profile_role_serializer.save() return JsonResponse(profile_role_serializer.data, safe=False) return JsonResponse(profile_role_serializer.errors, safe=False, status=status.HTTP_400_BAD_REQUEST) @method_decorator(csrf_protect, name='dispatch') @method_decorator(ensure_csrf_cookie, name='dispatch') class ProfileRoleSimpleView(APIView): @swagger_auto_schema( operation_description="Obtenir un profile_role par son ID", responses={200: ProfileRoleSerializer} ) def get(self, request, id): profile_role = bdd.getObject(ProfileRole, "id", id) profile_role_serializer = ProfileRoleSerializer(profile_role) return JsonResponse(profile_role_serializer.data, safe=False) @swagger_auto_schema( operation_description="Mettre à jour un profile_role", request_body=ProfileRoleSerializer, responses={ 200: 'Mise à jour réussie', 400: 'Données invalides' } ) def put(self, request, id): data = JSONParser().parse(request) profile_role = ProfileRole.objects.get(id=id) profile_role_serializer = ProfileRoleSerializer(profile_role, data=data) if profile_role_serializer.is_valid(): profile_role_serializer.save() return JsonResponse(profile_role_serializer.data, safe=False) return JsonResponse(profile_role_serializer.errors, safe=False, status=status.HTTP_400_BAD_REQUEST) @swagger_auto_schema( operation_description="Supprimer un profile_role", responses={200: 'Suppression réussie'} ) def delete(self, request, id): try: # Récupérer le ProfileRole profile_role = ProfileRole.objects.get(id=id) profile = profile_role.profile # Vérifier si le ProfileRole est de type PARENT if profile_role.role_type == ProfileRole.RoleType.PROFIL_PARENT: guardian = Guardian.objects.filter(profile_role=profile_role).first() if guardian: # Vérifier si ce Guardian est rattaché à des élèves for student in guardian.student_set.all(): # Vérifier si l'élève n'a pas d'autres Guardians other_guardians = student.guardians.exclude(id=guardian.id) if not other_guardians.exists(): return JsonResponse( {"error": f"Impossible de supprimer ce profil car l'élève {student.first_name} {student.last_name} n'aura plus de responsable légal."}, status=status.HTTP_400_BAD_REQUEST ) # Supprimer le ProfileRole profile_role.delete() # Vérifier si le profil n'a plus de rôles associés if not ProfileRole.objects.filter(profile=profile).exists(): profile.delete() return JsonResponse({'message': 'Suppression réussie'}, safe=False) except ProfileRole.DoesNotExist: return JsonResponse( {"error": "ProfileRole non trouvé."}, status=status.HTTP_404_NOT_FOUND ) except Exception as e: return JsonResponse( {"error": f"Une erreur est survenue : {str(e)}"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR )