from django.conf import settings from django.contrib.auth import login, authenticate, get_user_model from django.http.response import JsonResponse from django.views.decorators.csrf import ensure_csrf_cookie, csrf_exempt, csrf_protect from django.utils.decorators import method_decorator from django.core.exceptions import ValidationError from django.core.cache import cache from django.middleware.csrf import get_token from rest_framework.views import APIView from rest_framework.parsers import JSONParser from rest_framework import status from drf_yasg.utils import swagger_auto_schema from drf_yasg import openapi from datetime import datetime, timedelta import jwt from jwt.exceptions import ExpiredSignatureError, InvalidTokenError import json from . import validator from .models import Profile from rest_framework.decorators import action, api_view from Auth.serializers import ProfileSerializer, ProfilUpdateSerializer from Subscriptions.models import RegistrationForm from Subscriptions.signals import clear_cache import Subscriptions.mailManager as mailer import Subscriptions.util as util import logging from N3wtSchool import bdd, error from rest_framework_simplejwt.authentication import JWTAuthentication logger = logging.getLogger("AuthViews") @swagger_auto_schema( method='get', operation_description="Obtenir un token CSRF", responses={200: openapi.Response('Token CSRF', schema=openapi.Schema(type=openapi.TYPE_OBJECT, properties={ 'csrfToken': openapi.Schema(type=openapi.TYPE_STRING) }))} ) @api_view(['GET']) def csrf(request): token = get_token(request) return JsonResponse({'csrfToken': token}) class SessionView(APIView): @swagger_auto_schema( operation_description="Vérifier une session utilisateur", manual_parameters=[openapi.Parameter('Authorization', openapi.IN_HEADER, type=openapi.TYPE_STRING, description='Bearer token')], responses={ 200: openapi.Response('Session valide', schema=openapi.Schema(type=openapi.TYPE_OBJECT, properties={ 'user': openapi.Schema(type=openapi.TYPE_OBJECT, properties={ 'id': openapi.Schema(type=openapi.TYPE_INTEGER), 'email': openapi.Schema(type=openapi.TYPE_STRING), 'role': openapi.Schema(type=openapi.TYPE_STRING) }) })), 401: openapi.Response('Session invalide') } ) def get(self, request): token = request.META.get('HTTP_AUTHORIZATION', '').split('Bearer ')[-1] try: decoded_token = jwt.decode(token, settings.SECRET_KEY, algorithms=['HS256']) print(f'decode : {decoded_token}') userid = decoded_token.get('id') user = Profile.objects.get(id=userid) response_data = { 'user': { 'id': user.id, 'email': user.email, 'role': user.droit, # Assure-toi que le champ 'droit' existe et contient le rôle } } return JsonResponse(response_data, status=status.HTTP_200_OK) except jwt.ExpiredSignatureError: return JsonResponse({"error": "Token has expired"}, status=status.HTTP_401_UNAUTHORIZED) except jwt.InvalidTokenError: return JsonResponse({"error": "Invalid token"}, status=status.HTTP_401_UNAUTHORIZED) class ProfileView(APIView): @swagger_auto_schema( operation_description="Obtenir la liste des profils", responses={200: ProfileSerializer(many=True)} ) def get(self, request): profilsList = bdd.getAllObjects(_objectName=Profile) profils_serializer = ProfileSerializer(profilsList, many=True) return JsonResponse(profils_serializer.data, safe=False) @swagger_auto_schema( operation_description="Créer un nouveau profil", request_body=ProfileSerializer, responses={ 200: ProfileSerializer, 400: 'Données invalides' } ) def post(self, request): profil_data=JSONParser().parse(request) print(f'{profil_data}') profil_serializer = ProfileSerializer(data=profil_data) if profil_serializer.is_valid(): profil_serializer.save() return JsonResponse(profil_serializer.data, safe=False) return JsonResponse(profil_serializer.errors, safe=False, status=status.HTTP_400_BAD_REQUEST) @method_decorator(csrf_protect, name='dispatch') @method_decorator(ensure_csrf_cookie, name='dispatch') class ProfileSimpleView(APIView): @swagger_auto_schema( operation_description="Obtenir un profil par son ID", responses={200: ProfileSerializer} ) def get(self, request, id): profil=bdd.getObject(Profile, "id", id) profil_serializer=ProfileSerializer(profil) return JsonResponse(profil_serializer.data, safe=False) @swagger_auto_schema( operation_description="Mettre à jour un profil", request_body=ProfilUpdateSerializer, responses={ 200: 'Mise à jour réussie', 400: 'Données invalides' } ) def put(self, request, id): data=JSONParser().parse(request) profil = Profile.objects.get(id=id) profil_serializer = ProfilUpdateSerializer(profil, data=data) if profil_serializer.is_valid(): profil_serializer.save() return JsonResponse("Updated Successfully", safe=False) return JsonResponse(profil_serializer.errors, safe=False, status=status.HTTP_400_BAD_REQUEST) @swagger_auto_schema( operation_description="Supprimer un profil", responses={200: 'Suppression réussie'} ) def delete(self, request, id): return bdd.delete_object(Profile, id) @method_decorator(csrf_exempt, name='dispatch') class LoginView(APIView): @swagger_auto_schema( operation_description="Connexion utilisateur", request_body=openapi.Schema( type=openapi.TYPE_OBJECT, required=['email', 'password'], properties={ 'email': openapi.Schema(type=openapi.TYPE_STRING), 'password': openapi.Schema(type=openapi.TYPE_STRING) } ), responses={ 200: openapi.Response('Connexion réussie', schema=openapi.Schema( type=openapi.TYPE_OBJECT, properties={ 'token': openapi.Schema(type=openapi.TYPE_STRING), 'refresh': openapi.Schema(type=openapi.TYPE_STRING) } )), 400: openapi.Response('Connexion échouée', schema=openapi.Schema( type=openapi.TYPE_OBJECT, properties={ 'errorFields': openapi.Schema(type=openapi.TYPE_OBJECT), 'errorMessage': openapi.Schema(type=openapi.TYPE_STRING) } )) } ) def post(self, request): data = JSONParser().parse(request) validatorAuthentication = validator.ValidatorAuthentication(data=data) retour = error.returnMessage[error.WRONG_ID] validationOk, errorFields = validatorAuthentication.validate() user = None if validationOk: user = authenticate( email=data.get('email'), password=data.get('password'), ) if user is not None: if user.is_active: login(request, user) user.estConnecte = True user.save() clear_cache() retour = '' # Générer le JWT avec la bonne syntaxe datetime access_payload = { 'user_id': user.id, 'email': user.email, 'droit': user.droit, 'establishment': user.establishment.id, 'type': 'access', 'exp': datetime.utcnow() + settings.SIMPLE_JWT['ACCESS_TOKEN_LIFETIME'], 'iat': datetime.utcnow(), } access_token = jwt.encode(access_payload, settings.SIMPLE_JWT['SIGNING_KEY'], algorithm=settings.SIMPLE_JWT['ALGORITHM']) # Générer le Refresh Token (exp: 7 jours) refresh_payload = { 'user_id': user.id, 'type': 'refresh', 'exp': datetime.utcnow() + settings.SIMPLE_JWT['REFRESH_TOKEN_LIFETIME'], 'iat': datetime.utcnow(), } refresh_token = jwt.encode(refresh_payload, settings.SIMPLE_JWT['SIGNING_KEY'], algorithm=settings.SIMPLE_JWT['ALGORITHM']) return JsonResponse({ 'token': access_token, 'refresh': refresh_token }, safe=False) else: retour = error.returnMessage[error.PROFIL_INACTIVE] else: retour = error.returnMessage[error.WRONG_ID] return JsonResponse({ 'errorFields': errorFields, 'errorMessage': retour, }, safe=False, status=status.HTTP_400_BAD_REQUEST) class RefreshJWTView(APIView): @swagger_auto_schema( operation_description="Rafraîchir le token d'accès", request_body=openapi.Schema( type=openapi.TYPE_OBJECT, required=['refresh'], properties={ 'refresh': openapi.Schema(type=openapi.TYPE_STRING) } ), responses={ 200: openapi.Response('Token rafraîchi avec succès', schema=openapi.Schema( type=openapi.TYPE_OBJECT, properties={ 'token': openapi.Schema(type=openapi.TYPE_STRING), 'refresh': openapi.Schema(type=openapi.TYPE_STRING), } )), 400: openapi.Response('Échec du rafraîchissement', schema=openapi.Schema( type=openapi.TYPE_OBJECT, properties={ 'errorMessage': openapi.Schema(type=openapi.TYPE_STRING) } )) } ) @method_decorator(csrf_exempt, name='dispatch') def post(self, request): data = JSONParser().parse(request) refresh_token = data.get("refresh") logger.info(f"Token reçu: {refresh_token[:20]}...") # Ne pas logger le token complet pour la sécurité if not refresh_token: return JsonResponse({'errorMessage': 'Refresh token manquant'}, status=400) try: # Décoder le Refresh Token logger.info("Tentative de décodage du token") logger.info(f"Algorithme utilisé: {settings.SIMPLE_JWT['ALGORITHM']}") # Vérifier le format du token avant décodage token_parts = refresh_token.split('.') if len(token_parts) != 3: logger.error("Format de token invalide - pas 3 parties") return JsonResponse({'errorMessage': 'Format de token invalide'}, status=400) payload = jwt.decode( refresh_token, settings.SIMPLE_JWT['SIGNING_KEY'], algorithms=[settings.SIMPLE_JWT['ALGORITHM']] # Noter le passage en liste ) logger.info(f"Token décodé avec succès. Type: {payload.get('type')}") # Vérifier s'il s'agit bien d'un Refresh Token if payload.get('type') != 'refresh': return JsonResponse({'errorMessage': 'Token invalide'}, status=400) # Récupérer les informations utilisateur user = Profile.objects.get(id=payload['user_id']) # Générer un nouveau Access Token avec les informations complètes new_access_payload = { 'user_id': user.id, 'email': user.email, 'droit': user.droit, 'establishment': user.establishment.id, 'type': 'access', 'exp': datetime.utcnow() + settings.SIMPLE_JWT['ACCESS_TOKEN_LIFETIME'], 'iat': datetime.utcnow(), } new_access_token = jwt.encode(new_access_payload, settings.SIMPLE_JWT['SIGNING_KEY'], algorithm=settings.SIMPLE_JWT['ALGORITHM']) new_refresh_payload = { 'user_id': user.id, 'type': 'refresh', 'exp': datetime.utcnow() + settings.SIMPLE_JWT['REFRESH_TOKEN_LIFETIME'], 'iat': datetime.utcnow(), } new_refresh_token = jwt.encode(new_refresh_payload, settings.SIMPLE_JWT['SIGNING_KEY'], algorithm=settings.SIMPLE_JWT['ALGORITHM']) return JsonResponse({'token': new_access_token, 'refresh': new_refresh_token}, status=200) except ExpiredSignatureError as e: logger.error(f"Token expiré: {str(e)}") return JsonResponse({'errorMessage': 'Refresh token expiré'}, status=400) except InvalidTokenError as e: logger.error(f"Token invalide: {str(e)}") return JsonResponse({'errorMessage': f'Token invalide: {str(e)}'}, status=400) except Exception as e: logger.error(f"Erreur inattendue: {str(e)}") return JsonResponse({'errorMessage': f'Erreur inattendue: {str(e)}'}, status=400) @method_decorator(csrf_protect, name='dispatch') @method_decorator(ensure_csrf_cookie, name='dispatch') class SubscribeView(APIView): @swagger_auto_schema( operation_description="Inscription utilisateur", request_body=openapi.Schema( type=openapi.TYPE_OBJECT, required=['email', 'password1', 'password2'], properties={ 'email': openapi.Schema(type=openapi.TYPE_STRING), 'password1': openapi.Schema(type=openapi.TYPE_STRING), 'password2': openapi.Schema(type=openapi.TYPE_STRING) } ), responses={ 200: openapi.Response('Inscription réussie', schema=openapi.Schema( type=openapi.TYPE_OBJECT, properties={ 'message': openapi.Schema(type=openapi.TYPE_STRING), 'errorMessage': openapi.Schema(type=openapi.TYPE_STRING), 'errorFields': openapi.Schema(type=openapi.TYPE_OBJECT), 'id': openapi.Schema(type=openapi.TYPE_INTEGER) } )) } ) def post(self, request): retourErreur = error.returnMessage[error.BAD_URL] retour = '' newProfilConnection=JSONParser().parse(request) validatorSubscription = validator.ValidatorSubscription(data=newProfilConnection) validationOk, errorFields = validatorSubscription.validate() if validationOk: # On vérifie que l'email existe : si ce n'est pas le cas, on retourne une erreur profil = bdd.getProfile(Profile.objects.all(), newProfilConnection.get('email')) if profil == None: retourErreur = error.returnMessage[error.PROFIL_NOT_EXISTS] else: if profil.is_active: retourErreur=error.returnMessage[error.PROFIL_ACTIVE] return JsonResponse({'message':retour,'errorMessage':retourErreur, "errorFields":errorFields, "id":profil.id}, safe=False) else: try: profil.set_password(newProfilConnection.get('password1')) profil.is_active = True profil.full_clean() profil.save() clear_cache() retour = error.returnMessage[error.MESSAGE_ACTIVATION_PROFILE] retourErreur='' return JsonResponse({'message':retour,'errorMessage':retourErreur, "errorFields":errorFields, "id":profil.id}, safe=False) except ValidationError as e: retourErreur = error.returnMessage[error.WRONG_MAIL_FORMAT] return JsonResponse({'message':retour,'errorMessage':retourErreur, "errorFields":errorFields}, safe=False) return JsonResponse({'message':retour, 'errorMessage':retourErreur, "errorFields":errorFields, "id":-1}, safe=False) @method_decorator(csrf_protect, name='dispatch') @method_decorator(ensure_csrf_cookie, name='dispatch') class NewPasswordView(APIView): @swagger_auto_schema( operation_description="Demande de nouveau mot de passe", request_body=openapi.Schema( type=openapi.TYPE_OBJECT, required=['email'], properties={ 'email': openapi.Schema(type=openapi.TYPE_STRING) } ), responses={ 200: openapi.Response('Demande réussie', schema=openapi.Schema( type=openapi.TYPE_OBJECT, properties={ 'message': openapi.Schema(type=openapi.TYPE_STRING), 'errorMessage': openapi.Schema(type=openapi.TYPE_STRING), 'errorFields': openapi.Schema(type=openapi.TYPE_OBJECT) } )) } ) def post(self, request): retourErreur = error.returnMessage[error.BAD_URL] retour = '' newProfilConnection=JSONParser().parse(request) validatorNewPassword = validator.ValidatorNewPassword(data=newProfilConnection) validationOk, errorFields = validatorNewPassword.validate() if validationOk: profil = bdd.getProfile(Profile.objects.all(), newProfilConnection.get('email')) if profil == None: retourErreur = error.returnMessage[error.PROFIL_NOT_EXISTS] else: # Génération d'une URL provisoire pour modifier le mot de passe profil.code = util.genereRandomCode(12) profil.datePeremption = util.calculeDatePeremption(util._now(), settings.EXPIRATION_URL_NB_DAYS) profil.save() clear_cache() retourErreur = '' retour = error.returnMessage[error.MESSAGE_REINIT_PASSWORD]%(newProfilConnection.get('email')) mailer.envoieReinitMotDePasse(newProfilConnection.get('email'), profil.code) return JsonResponse({'message':retour, 'errorMessage':retourErreur, "errorFields":errorFields}, safe=False) @method_decorator(csrf_protect, name='dispatch') @method_decorator(ensure_csrf_cookie, name='dispatch') class ResetPasswordView(APIView): @swagger_auto_schema( operation_description="Réinitialisation du mot de passe", request_body=openapi.Schema( type=openapi.TYPE_OBJECT, required=['password1', 'password2'], properties={ 'password1': openapi.Schema(type=openapi.TYPE_STRING), 'password2': openapi.Schema(type=openapi.TYPE_STRING) } ), responses={ 200: openapi.Response('Réinitialisation réussie', schema=openapi.Schema( type=openapi.TYPE_OBJECT, properties={ 'message': openapi.Schema(type=openapi.TYPE_STRING), 'errorMessage': openapi.Schema(type=openapi.TYPE_STRING), 'errorFields': openapi.Schema(type=openapi.TYPE_OBJECT) } )) } ) def post(self, request, code): retourErreur = error.returnMessage[error.BAD_URL] retour = '' newProfilConnection=JSONParser().parse(request) validatorResetPassword = validator.ValidatorResetPassword(data=newProfilConnection) validationOk, errorFields = validatorResetPassword.validate() profil = bdd.getObject(Profile, "code", code) if profil: if datetime.strptime(util.convertToStr(util._now(), '%d-%m-%Y %H:%M'), '%d-%m-%Y %H:%M') > datetime.strptime(profil.datePeremption, '%d-%m-%Y %H:%M'): retourErreur = error.returnMessage[error.EXPIRED_URL]%(_uuid) elif validationOk: retour = error.returnMessage[error.PASSWORD_CHANGED] profil.set_password(newProfilConnection.get('password1')) profil.code = '' profil.datePeremption = '' profil.is_active = True profil.save() clear_cache() retourErreur='' return JsonResponse({'message':retour, "errorMessage":retourErreur, "errorFields":errorFields}, safe=False)