Files
n3wt-school/Back-End/Auth/views.py
N3WT DE COMPET fb73f9e9a8 feat: Gestion de la création d'un nouveau guardian, de l'association
avec un guardian dumême établissement, et de l'association avec un
guardian d'un autre établissement
2025-03-18 21:06:44 +01:00

606 lines
26 KiB
Python

from django.conf import settings
from django.contrib.auth import login, authenticate, get_user_model
from django.http.response import JsonResponse
from django.views.decorators.csrf import ensure_csrf_cookie, csrf_exempt, csrf_protect
from django.utils.decorators import method_decorator
from django.core.exceptions import ValidationError
from django.middleware.csrf import get_token
from rest_framework.views import APIView
from rest_framework.parsers import JSONParser
from rest_framework import status
from drf_yasg.utils import swagger_auto_schema
from drf_yasg import openapi
from datetime import datetime, timedelta
import jwt
from jwt.exceptions import ExpiredSignatureError, InvalidTokenError
import json
from . import validator
from .models import Profile, ProfileRole
from rest_framework.decorators import action, api_view
from Auth.serializers import ProfileSerializer, ProfileRoleSerializer
from Subscriptions.models import RegistrationForm
import Subscriptions.mailManager as mailer
import Subscriptions.util as util
import logging
from N3wtSchool import bdd, error
from rest_framework_simplejwt.authentication import JWTAuthentication
logger = logging.getLogger("AuthViews")
@swagger_auto_schema(
method='get',
operation_description="Obtenir un token CSRF",
responses={200: openapi.Response('Token CSRF', schema=openapi.Schema(type=openapi.TYPE_OBJECT, properties={
'csrfToken': openapi.Schema(type=openapi.TYPE_STRING)
}))}
)
@api_view(['GET'])
def csrf(request):
token = get_token(request)
return JsonResponse({'csrfToken': token})
class SessionView(APIView):
@swagger_auto_schema(
operation_description="Vérifier une session utilisateur",
manual_parameters=[openapi.Parameter('Authorization', openapi.IN_HEADER, type=openapi.TYPE_STRING, description='Bearer token')],
responses={
200: openapi.Response('Session valide', schema=openapi.Schema(type=openapi.TYPE_OBJECT, properties={
'user': openapi.Schema(type=openapi.TYPE_OBJECT, properties={
'id': openapi.Schema(type=openapi.TYPE_INTEGER),
'email': openapi.Schema(type=openapi.TYPE_STRING),
'roles': openapi.Schema(type=openapi.TYPE_ARRAY, items=openapi.Items(type=openapi.TYPE_OBJECT, properties={
'role_type': openapi.Schema(type=openapi.TYPE_STRING),
'establishment': openapi.Schema(type=openapi.TYPE_STRING)
}))
})
})),
401: openapi.Response('Session invalide')
}
)
def get(self, request):
token = request.META.get('HTTP_AUTHORIZATION', '').split('Bearer ')[-1]
try:
decoded_token = jwt.decode(token, settings.SECRET_KEY, algorithms=['HS256'])
userid = decoded_token.get('user_id')
user = Profile.objects.get(id=userid)
roles = ProfileRole.objects.filter(profile=user).values('role_type', 'establishment__name')
response_data = {
'user': {
'id': user.id,
'email': user.email,
'roles': list(roles)
}
}
return JsonResponse(response_data, status=status.HTTP_200_OK)
except jwt.ExpiredSignatureError:
return JsonResponse({"error": "Token has expired"}, status=status.HTTP_401_UNAUTHORIZED)
except jwt.InvalidTokenError:
return JsonResponse({"error": "Invalid token"}, status=status.HTTP_401_UNAUTHORIZED)
class ProfileView(APIView):
@swagger_auto_schema(
operation_description="Obtenir la liste des profils",
responses={200: ProfileSerializer(many=True)}
)
def get(self, request):
profilsList = bdd.getAllObjects(_objectName=Profile)
profils_serializer = ProfileSerializer(profilsList, many=True)
return JsonResponse(profils_serializer.data, safe=False)
@swagger_auto_schema(
operation_description="Créer un nouveau profil",
request_body=ProfileSerializer,
responses={
200: ProfileSerializer,
400: 'Données invalides'
}
)
def post(self, request):
profil_data = JSONParser().parse(request)
profil_serializer = ProfileSerializer(data=profil_data)
if profil_serializer.is_valid():
profil = profil_serializer.save()
return JsonResponse(profil_serializer.data, safe=False)
return JsonResponse(profil_serializer.errors, safe=False, status=status.HTTP_400_BAD_REQUEST)
@method_decorator(csrf_protect, name='dispatch')
@method_decorator(ensure_csrf_cookie, name='dispatch')
class ProfileSimpleView(APIView):
@swagger_auto_schema(
operation_description="Obtenir un profil par son ID",
responses={200: ProfileSerializer}
)
def get(self, request, id):
profil = bdd.getObject(Profile, "id", id)
profil_serializer = ProfileSerializer(profil)
return JsonResponse(profil_serializer.data, safe=False)
@swagger_auto_schema(
operation_description="Mettre à jour un profil",
request_body=ProfileSerializer,
responses={
200: 'Mise à jour réussie',
400: 'Données invalides'
}
)
def put(self, request, id):
data = JSONParser().parse(request)
profil = Profile.objects.get(id=id)
profil_serializer = ProfileSerializer(profil, data=data)
if profil_serializer.is_valid():
profil_serializer.save()
return JsonResponse(profil_serializer.data, safe=False)
return JsonResponse(profil_serializer.errors, safe=False, status=status.HTTP_400_BAD_REQUEST)
@swagger_auto_schema(
operation_description="Supprimer un profil",
responses={200: 'Suppression réussie'}
)
def delete(self, request, id):
return bdd.delete_object(Profile, id)
@method_decorator(csrf_exempt, name='dispatch')
class LoginView(APIView):
@swagger_auto_schema(
operation_description="Connexion utilisateur",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
required=['email', 'password', 'role_type'],
properties={
'email': openapi.Schema(type=openapi.TYPE_STRING),
'password': openapi.Schema(type=openapi.TYPE_STRING),
'role_type': openapi.Schema(type=openapi.TYPE_STRING)
}
),
responses={
200: openapi.Response('Connexion réussie', schema=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'token': openapi.Schema(type=openapi.TYPE_STRING),
'refresh': openapi.Schema(type=openapi.TYPE_STRING)
}
)),
400: openapi.Response('Connexion échouée', schema=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'errorFields': openapi.Schema(type=openapi.TYPE_OBJECT),
'errorMessage': openapi.Schema(type=openapi.TYPE_STRING)
}
))
}
)
def post(self, request):
data = JSONParser().parse(request)
validatorAuthentication = validator.ValidatorAuthentication(data=data)
retour = error.returnMessage[error.WRONG_ID]
validationOk, errorFields = validatorAuthentication.validate()
user = None
if validationOk:
user = authenticate(
email=data.get('email'),
password=data.get('password'),
)
if user is not None:
role_type = data.get('role_type')
primary_role = ProfileRole.objects.filter(profile=user, role_type=role_type, is_active=True).first()
if not primary_role:
return JsonResponse({"errorMessage": "Profil inactif"}, status=status.HTTP_401_UNAUTHORIZED)
login(request, user)
user.save()
retour = ''
# Récupérer tous les rôles de l'utilisateur avec le type spécifié
roles = ProfileRole.objects.filter(profile=user, role_type=role_type).values('role_type', 'establishment__id', 'establishment__name')
# Générer le JWT avec la bonne syntaxe datetime
access_payload = {
'user_id': user.id,
'email': user.email,
'roles': list(roles),
'type': 'access',
'exp': datetime.utcnow() + settings.SIMPLE_JWT['ACCESS_TOKEN_LIFETIME'],
'iat': datetime.utcnow(),
}
access_token = jwt.encode(access_payload, settings.SIMPLE_JWT['SIGNING_KEY'], algorithm=settings.SIMPLE_JWT['ALGORITHM'])
# Générer le Refresh Token (exp: 7 jours)
refresh_payload = {
'user_id': user.id,
'type': 'refresh',
'exp': datetime.utcnow() + settings.SIMPLE_JWT['REFRESH_TOKEN_LIFETIME'],
'iat': datetime.utcnow(),
}
refresh_token = jwt.encode(refresh_payload, settings.SIMPLE_JWT['SIGNING_KEY'], algorithm=settings.SIMPLE_JWT['ALGORITHM'])
return JsonResponse({
'token': access_token,
'refresh': refresh_token
}, safe=False)
else:
retour = error.returnMessage[error.WRONG_ID]
return JsonResponse({
'errorFields': errorFields,
'errorMessage': retour,
}, safe=False, status=status.HTTP_400_BAD_REQUEST)
class RefreshJWTView(APIView):
@swagger_auto_schema(
operation_description="Rafraîchir le token d'accès",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
required=['refresh'],
properties={
'refresh': openapi.Schema(type=openapi.TYPE_STRING)
}
),
responses={
200: openapi.Response('Token rafraîchi avec succès', schema=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'token': openapi.Schema(type=openapi.TYPE_STRING),
'refresh': openapi.Schema(type=openapi.TYPE_STRING),
}
)),
400: openapi.Response('Échec du rafraîchissement', schema=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'errorMessage': openapi.Schema(type=openapi.TYPE_STRING)
}
))
}
)
@method_decorator(csrf_exempt, name='dispatch')
def post(self, request):
data = JSONParser().parse(request)
refresh_token = data.get("refresh")
logger.info(f"Token reçu: {refresh_token[:20]}...") # Ne pas logger le token complet pour la sécurité
if not refresh_token:
return JsonResponse({'errorMessage': 'Refresh token manquant'}, status=400)
try:
# Décoder le Refresh Token
logger.info("Tentative de décodage du token")
logger.info(f"Algorithme utilisé: {settings.SIMPLE_JWT['ALGORITHM']}")
# Vérifier le format du token avant décodage
token_parts = refresh_token.split('.')
if len(token_parts) != 3:
logger.error("Format de token invalide - pas 3 parties")
return JsonResponse({'errorMessage': 'Format de token invalide'}, status=400)
payload = jwt.decode(
refresh_token,
settings.SIMPLE_JWT['SIGNING_KEY'],
algorithms=[settings.SIMPLE_JWT['ALGORITHM']] # Noter le passage en liste
)
logger.info(f"Token décodé avec succès. Type: {payload.get('type')}")
# Vérifier s'il s'agit bien d'un Refresh Token
if payload.get('type') != 'refresh':
return JsonResponse({'errorMessage': 'Token invalide'}, status=400)
# Récupérer les informations utilisateur
user = Profile.objects.get(id=payload['user_id'])
role_type = payload.get('role_type')
# Récupérer le rôle principal de l'utilisateur
primary_role = ProfileRole.objects.filter(profile=user, role_type=role_type, is_active=True).first()
if not primary_role:
return JsonResponse({'errorMessage': 'Profil inactif'}, status=400)
# Générer un nouveau Access Token avec les informations complètes
new_access_payload = {
'user_id': user.id,
'email': user.email,
'role_type': primary_role.get_role_type_display(),
'establishment': primary_role.establishment.id,
'type': 'access',
'exp': datetime.utcnow() + settings.SIMPLE_JWT['ACCESS_TOKEN_LIFETIME'],
'iat': datetime.utcnow(),
}
new_access_token = jwt.encode(new_access_payload, settings.SIMPLE_JWT['SIGNING_KEY'], algorithm=settings.SIMPLE_JWT['ALGORITHM'])
new_refresh_payload = {
'user_id': user.id,
'role_type': role_type,
'type': 'refresh',
'exp': datetime.utcnow() + settings.SIMPLE_JWT['REFRESH_TOKEN_LIFETIME'],
'iat': datetime.utcnow(),
}
new_refresh_token = jwt.encode(new_refresh_payload, settings.SIMPLE_JWT['SIGNING_KEY'], algorithm=settings.SIMPLE_JWT['ALGORITHM'])
return JsonResponse({'token': new_access_token, 'refresh': new_refresh_token}, status=200)
except ExpiredSignatureError as e:
logger.error(f"Token expiré: {str(e)}")
return JsonResponse({'errorMessage': 'Refresh token expiré'}, status=400)
except InvalidTokenError as e:
logger.error(f"Token invalide: {str(e)}")
return JsonResponse({'errorMessage': f'Token invalide: {str(e)}'}, status=400)
except Exception as e:
logger.error(f"Erreur inattendue: {str(e)}")
return JsonResponse({'errorMessage': f'Erreur inattendue: {str(e)}'}, status=400)
@method_decorator(csrf_protect, name='dispatch')
@method_decorator(ensure_csrf_cookie, name='dispatch')
class SubscribeView(APIView):
@swagger_auto_schema(
operation_description="Inscription utilisateur",
manual_parameters=[
openapi.Parameter(
'establishment_id', openapi.IN_QUERY,
description="ID de l'établissement",
type=openapi.TYPE_INTEGER,
required=True
)
],
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
required=['email', 'password1', 'password2'],
properties={
'email': openapi.Schema(type=openapi.TYPE_STRING),
'password1': openapi.Schema(type=openapi.TYPE_STRING),
'password2': openapi.Schema(type=openapi.TYPE_STRING)
}
),
responses={
200: openapi.Response('Inscription réussie', schema=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'message': openapi.Schema(type=openapi.TYPE_STRING),
'errorMessage': openapi.Schema(type=openapi.TYPE_STRING),
'errorFields': openapi.Schema(type=openapi.TYPE_OBJECT),
'id': openapi.Schema(type=openapi.TYPE_INTEGER)
}
))
}
)
def post(self, request):
retourErreur = error.returnMessage[error.BAD_URL]
retour = ''
newProfilConnection = JSONParser().parse(request)
establishment_id = newProfilConnection['establishment_id']
validatorSubscription = validator.ValidatorSubscription(data=newProfilConnection)
validationOk, errorFields = validatorSubscription.validate()
if validationOk:
# On vérifie que l'email existe : si ce n'est pas le cas, on retourne une erreur
profil = bdd.getProfile(Profile.objects.all(), newProfilConnection.get('email'))
if profil is None:
retourErreur = error.returnMessage[error.PROFIL_NOT_EXISTS]
else:
# Vérifier si le profil a déjà un rôle actif pour l'établissement donné
active_roles = ProfileRole.objects.filter(profile=profil, establishment=establishment_id, is_active=True)
if active_roles.exists():
retourErreur = error.returnMessage[error.PROFIL_ACTIVE]
return JsonResponse({'message': retour, 'errorMessage': retourErreur, "errorFields": errorFields, "id": profil.id}, safe=False)
else:
try:
profil.set_password(newProfilConnection.get('password1'))
profil.full_clean()
profil.save()
# Récupérer le ProfileRole existant pour l'établissement et le profil
profile_role = ProfileRole.objects.filter(profile=profil, establishment=establishment_id).first()
if profile_role:
profile_role.is_active = True
profile_role.save()
else:
# Si aucun ProfileRole n'existe, en créer un nouveau
role_data = {
'profile': profil.id,
'establishment': establishment_id,
'is_active': True
}
role_serializer = ProfileRoleSerializer(data=role_data)
if role_serializer.is_valid():
role_serializer.save()
else:
return JsonResponse(role_serializer.errors, safe=False, status=status.HTTP_400_BAD_REQUEST)
retour = error.returnMessage[error.MESSAGE_ACTIVATION_PROFILE]
retourErreur = ''
return JsonResponse({'message': retour, 'errorMessage': retourErreur, "errorFields": errorFields, "id": profil.id}, safe=False)
except ValidationError as e:
retourErreur = error.returnMessage[error.WRONG_MAIL_FORMAT]
return JsonResponse({'message': retour, 'errorMessage': retourErreur, "errorFields": errorFields}, safe=False)
return JsonResponse({'message': retour, 'errorMessage': retourErreur, "errorFields": errorFields, "id": -1}, safe=False)
@method_decorator(csrf_protect, name='dispatch')
@method_decorator(ensure_csrf_cookie, name='dispatch')
class NewPasswordView(APIView):
@swagger_auto_schema(
operation_description="Demande de nouveau mot de passe",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
required=['email'],
properties={
'email': openapi.Schema(type=openapi.TYPE_STRING)
}
),
responses={
200: openapi.Response('Demande réussie', schema=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'message': openapi.Schema(type=openapi.TYPE_STRING),
'errorMessage': openapi.Schema(type=openapi.TYPE_STRING),
'errorFields': openapi.Schema(type=openapi.TYPE_OBJECT)
}
))
}
)
def post(self, request):
retourErreur = error.returnMessage[error.BAD_URL]
retour = ''
newProfilConnection = JSONParser().parse(request)
validatorNewPassword = validator.ValidatorNewPassword(data=newProfilConnection)
validationOk, errorFields = validatorNewPassword.validate()
if validationOk:
profil = bdd.getProfile(Profile.objects.all(), newProfilConnection.get('email'))
if profil is None:
retourErreur = error.returnMessage[error.PROFIL_NOT_EXISTS]
else:
try:
# Génération d'une URL provisoire pour modifier le mot de passe
profil.code = util.genereRandomCode(12)
profil.datePeremption = util.calculeDatePeremption(util._now(), settings.EXPIRATION_URL_NB_DAYS)
profil.save()
retourErreur = ''
retour = error.returnMessage[error.MESSAGE_REINIT_PASSWORD] % (newProfilConnection.get('email'))
mailer.envoieReinitMotDePasse(newProfilConnection.get('email'), profil.code)
except ValidationError as e:
retourErreur = error.returnMessage[error.WRONG_MAIL_FORMAT]
return JsonResponse({'message': retour, 'errorMessage': retourErreur, "errorFields": errorFields}, safe=False)
return JsonResponse({'message': retour, 'errorMessage': retourErreur, "errorFields": errorFields}, safe=False)
@method_decorator(csrf_protect, name='dispatch')
@method_decorator(ensure_csrf_cookie, name='dispatch')
class ResetPasswordView(APIView):
@swagger_auto_schema(
operation_description="Réinitialisation du mot de passe",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
required=['password1', 'password2'],
properties={
'password1': openapi.Schema(type=openapi.TYPE_STRING),
'password2': openapi.Schema(type=openapi.TYPE_STRING)
}
),
responses={
200: openapi.Response('Réinitialisation réussie', schema=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'message': openapi.Schema(type=openapi.TYPE_STRING),
'errorMessage': openapi.Schema(type=openapi.TYPE_STRING),
'errorFields': openapi.Schema(type=openapi.TYPE_OBJECT)
}
))
}
)
def post(self, request, code):
retourErreur = error.returnMessage[error.BAD_URL]
retour = ''
newProfilConnection = JSONParser().parse(request)
validatorResetPassword = validator.ValidatorResetPassword(data=newProfilConnection)
validationOk, errorFields = validatorResetPassword.validate()
profil = bdd.getObject(Profile, "code", code)
if profil:
if datetime.strptime(util.convertToStr(util._now(), '%d-%m-%Y %H:%M'), '%d-%m-%Y %H:%M') > datetime.strptime(profil.datePeremption, '%d-%m-%Y %H:%M'):
retourErreur = error.returnMessage[error.EXPIRED_URL] % (_uuid)
elif validationOk:
retour = error.returnMessage[error.PASSWORD_CHANGED]
profil.set_password(newProfilConnection.get('password1'))
profil.code = ''
profil.datePeremption = ''
profil.save()
retourErreur = ''
return JsonResponse({'message': retour, "errorMessage": retourErreur, "errorFields": errorFields}, safe=False)
class ProfileRoleView(APIView):
@swagger_auto_schema(
operation_description="Obtenir la liste des profile_roles",
responses={200: ProfileRoleSerializer(many=True)}
)
def get(self, request):
establishment_id = request.GET.get('establishment_id', None)
if establishment_id is None:
return JsonResponse({'error': 'establishment_id est requis'}, safe=False, status=status.HTTP_400_BAD_REQUEST)
profiles_roles_List = bdd.getAllObjects(_objectName=ProfileRole)
if profiles_roles_List:
profiles_roles_List = profiles_roles_List.filter(establishment=establishment_id).distinct()
profile_roles_serializer = ProfileRoleSerializer(profiles_roles_List, many=True)
return JsonResponse(profile_roles_serializer.data, safe=False)
@swagger_auto_schema(
operation_description="Créer un nouveau profile_role",
request_body=ProfileRoleSerializer,
responses={
200: ProfileRoleSerializer,
400: 'Données invalides'
}
)
def post(self, request):
profile_role_data = JSONParser().parse(request)
profile_role_serializer = ProfileRoleSerializer(data=profile_role_data)
if profile_role_serializer.is_valid():
profile_role = profile_role_serializer.save()
return JsonResponse(profile_role_serializer.data, safe=False)
return JsonResponse(profile_role_serializer.errors, safe=False, status=status.HTTP_400_BAD_REQUEST)
@method_decorator(csrf_protect, name='dispatch')
@method_decorator(ensure_csrf_cookie, name='dispatch')
class ProfileRoleSimpleView(APIView):
@swagger_auto_schema(
operation_description="Obtenir un profile_role par son ID",
responses={200: ProfileRoleSerializer}
)
def get(self, request, id):
profile_role = bdd.getObject(ProfileRole, "id", id)
profile_role_serializer = ProfileRoleSerializer(profile_role)
return JsonResponse(profile_role_serializer.data, safe=False)
@swagger_auto_schema(
operation_description="Mettre à jour un profile_role",
request_body=ProfileRoleSerializer,
responses={
200: 'Mise à jour réussie',
400: 'Données invalides'
}
)
def put(self, request, id):
data = JSONParser().parse(request)
profile_role = ProfileRole.objects.get(id=id)
profile_role_serializer = ProfileRoleSerializer(profile_role, data=data)
if profile_role_serializer.is_valid():
profile_role_serializer.save()
return JsonResponse(profile_role_serializer.data, safe=False)
return JsonResponse(profile_role_serializer.errors, safe=False, status=status.HTTP_400_BAD_REQUEST)
@swagger_auto_schema(
operation_description="Supprimer un profile_role",
responses={200: 'Suppression réussie'}
)
def delete(self, request, id):
profile_role = ProfileRole.objects.get(id=id)
profile = profile_role.profile
profile_role.delete()
# Vérifier si le profil n'a plus de rôles associés
if not ProfileRole.objects.filter(profile=profile).exists():
profile.delete()
return JsonResponse({'message': 'Suppression réussie'}, safe=False)