mirror of
https://git.v0id.ovh/n3wt-innov/n3wt-school.git
synced 2026-01-29 16:03:21 +00:00
668 lines
29 KiB
Python
668 lines
29 KiB
Python
from django.conf import settings
|
|
from django.contrib.auth import login, authenticate, get_user_model
|
|
from django.http.response import JsonResponse
|
|
from django.views.decorators.csrf import ensure_csrf_cookie, csrf_exempt, csrf_protect
|
|
from django.utils.decorators import method_decorator
|
|
from django.core.exceptions import ValidationError
|
|
from django.middleware.csrf import get_token
|
|
from rest_framework.views import APIView
|
|
from rest_framework.parsers import JSONParser
|
|
from rest_framework import status
|
|
from Auth.pagination import CustomProfilesPagination
|
|
|
|
from drf_yasg.utils import swagger_auto_schema
|
|
from drf_yasg import openapi
|
|
|
|
from datetime import datetime, timedelta
|
|
import jwt
|
|
from jwt.exceptions import ExpiredSignatureError, InvalidTokenError
|
|
import json
|
|
|
|
from . import validator
|
|
from .models import Profile, ProfileRole
|
|
from rest_framework.decorators import action, api_view
|
|
from django.db.models import Q
|
|
|
|
from Auth.serializers import ProfileSerializer, ProfileRoleSerializer
|
|
from Subscriptions.models import RegistrationForm, Guardian
|
|
import N3wtSchool.mailManager as mailer
|
|
import Subscriptions.util as util
|
|
import logging
|
|
from N3wtSchool import bdd, error, settings
|
|
|
|
from rest_framework_simplejwt.authentication import JWTAuthentication
|
|
|
|
logger = logging.getLogger("AuthViews")
|
|
|
|
|
|
@swagger_auto_schema(
|
|
method='get',
|
|
operation_description="Obtenir un token CSRF",
|
|
responses={200: openapi.Response('Token CSRF', schema=openapi.Schema(type=openapi.TYPE_OBJECT, properties={
|
|
'csrfToken': openapi.Schema(type=openapi.TYPE_STRING)
|
|
}))}
|
|
)
|
|
@api_view(['GET'])
|
|
def csrf(request):
|
|
token = get_token(request)
|
|
return JsonResponse({'csrfToken': token})
|
|
|
|
class SessionView(APIView):
|
|
@swagger_auto_schema(
|
|
operation_description="Vérifier une session utilisateur",
|
|
manual_parameters=[openapi.Parameter('Authorization', openapi.IN_HEADER, type=openapi.TYPE_STRING, description='Bearer token')],
|
|
responses={
|
|
200: openapi.Response('Session valide', schema=openapi.Schema(type=openapi.TYPE_OBJECT, properties={
|
|
'user': openapi.Schema(type=openapi.TYPE_OBJECT, properties={
|
|
'id': openapi.Schema(type=openapi.TYPE_INTEGER),
|
|
'email': openapi.Schema(type=openapi.TYPE_STRING),
|
|
'roleIndexLoginDefault': openapi.Schema(type=openapi.TYPE_INTEGER),
|
|
'roles': openapi.Schema(type=openapi.TYPE_ARRAY, items=openapi.Items(type=openapi.TYPE_OBJECT, properties={
|
|
'role_type': openapi.Schema(type=openapi.TYPE_STRING),
|
|
'establishment': openapi.Schema(type=openapi.TYPE_STRING)
|
|
}))
|
|
})
|
|
})),
|
|
401: openapi.Response('Session invalide')
|
|
}
|
|
)
|
|
def get(self, request):
|
|
token = request.META.get('HTTP_AUTHORIZATION', '').split('Bearer ')[-1]
|
|
try:
|
|
decoded_token = jwt.decode(token, settings.SECRET_KEY, algorithms=['HS256'])
|
|
userid = decoded_token.get('user_id')
|
|
user = Profile.objects.get(id=userid)
|
|
roles = ProfileRole.objects.filter(profile=user).values('role_type', 'establishment__name')
|
|
response_data = {
|
|
'user': {
|
|
'id': user.id,
|
|
'email': user.email,
|
|
'roleIndexLoginDefault': user.roleIndexLoginDefault,
|
|
'roles': list(roles)
|
|
}
|
|
}
|
|
return JsonResponse(response_data, status=status.HTTP_200_OK)
|
|
except jwt.ExpiredSignatureError:
|
|
return JsonResponse({"error": "Token has expired"}, status=status.HTTP_401_UNAUTHORIZED)
|
|
except jwt.InvalidTokenError:
|
|
return JsonResponse({"error": "Invalid token"}, status=status.HTTP_401_UNAUTHORIZED)
|
|
|
|
class ProfileView(APIView):
|
|
@swagger_auto_schema(
|
|
operation_description="Obtenir la liste des profils",
|
|
responses={200: ProfileSerializer(many=True)}
|
|
)
|
|
def get(self, request):
|
|
profilsList = bdd.getAllObjects(_objectName=Profile)
|
|
profils_serializer = ProfileSerializer(profilsList, many=True)
|
|
return JsonResponse(profils_serializer.data, safe=False)
|
|
|
|
@swagger_auto_schema(
|
|
operation_description="Créer un nouveau profil",
|
|
request_body=ProfileSerializer,
|
|
responses={
|
|
200: ProfileSerializer,
|
|
400: 'Données invalides'
|
|
}
|
|
)
|
|
def post(self, request):
|
|
profil_data = JSONParser().parse(request)
|
|
profil_serializer = ProfileSerializer(data=profil_data)
|
|
|
|
if profil_serializer.is_valid():
|
|
profil = profil_serializer.save()
|
|
return JsonResponse(profil_serializer.data, safe=False)
|
|
|
|
return JsonResponse(profil_serializer.errors, safe=False, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
@method_decorator(csrf_protect, name='dispatch')
|
|
@method_decorator(ensure_csrf_cookie, name='dispatch')
|
|
class ProfileSimpleView(APIView):
|
|
@swagger_auto_schema(
|
|
operation_description="Obtenir un profil par son ID",
|
|
responses={200: ProfileSerializer}
|
|
)
|
|
def get(self, request, id):
|
|
profil = bdd.getObject(Profile, "id", id)
|
|
profil_serializer = ProfileSerializer(profil)
|
|
return JsonResponse(profil_serializer.data, safe=False)
|
|
|
|
@swagger_auto_schema(
|
|
operation_description="Mettre à jour un profil",
|
|
request_body=ProfileSerializer,
|
|
responses={
|
|
200: 'Mise à jour réussie',
|
|
400: 'Données invalides'
|
|
}
|
|
)
|
|
def put(self, request, id):
|
|
data = JSONParser().parse(request)
|
|
profil = Profile.objects.get(id=id)
|
|
profil_serializer = ProfileSerializer(profil, data=data)
|
|
if profil_serializer.is_valid():
|
|
profil_serializer.save()
|
|
return JsonResponse(profil_serializer.data, safe=False)
|
|
|
|
return JsonResponse(profil_serializer.errors, safe=False, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
@swagger_auto_schema(
|
|
operation_description="Supprimer un profil",
|
|
responses={200: 'Suppression réussie'}
|
|
)
|
|
def delete(self, request, id):
|
|
return bdd.delete_object(Profile, id)
|
|
|
|
@method_decorator(csrf_exempt, name='dispatch')
|
|
class LoginView(APIView):
|
|
@swagger_auto_schema(
|
|
operation_description="Connexion utilisateur",
|
|
request_body=openapi.Schema(
|
|
type=openapi.TYPE_OBJECT,
|
|
required=['email', 'password'],
|
|
properties={
|
|
'email': openapi.Schema(type=openapi.TYPE_STRING),
|
|
'password': openapi.Schema(type=openapi.TYPE_STRING)
|
|
}
|
|
),
|
|
responses={
|
|
200: openapi.Response('Connexion réussie', schema=openapi.Schema(
|
|
type=openapi.TYPE_OBJECT,
|
|
properties={
|
|
'token': openapi.Schema(type=openapi.TYPE_STRING),
|
|
'refresh': openapi.Schema(type=openapi.TYPE_STRING)
|
|
}
|
|
)),
|
|
400: openapi.Response('Connexion échouée', schema=openapi.Schema(
|
|
type=openapi.TYPE_OBJECT,
|
|
properties={
|
|
'errorFields': openapi.Schema(type=openapi.TYPE_OBJECT),
|
|
'errorMessage': openapi.Schema(type=openapi.TYPE_STRING)
|
|
}
|
|
))
|
|
}
|
|
)
|
|
def post(self, request):
|
|
data = JSONParser().parse(request)
|
|
validatorAuthentication = validator.ValidatorAuthentication(data=data)
|
|
retour = error.returnMessage[error.WRONG_ID]
|
|
validationOk, errorFields = validatorAuthentication.validate()
|
|
user = None
|
|
|
|
if validationOk:
|
|
user = authenticate(
|
|
email=data.get('email'),
|
|
password=data.get('password'),
|
|
)
|
|
if user is not None:
|
|
# Vérifier si l'utilisateur a un role actif
|
|
has_active_role = ProfileRole.objects.filter(profile=user, is_active=True).first()
|
|
if not has_active_role:
|
|
return JsonResponse({"errorMessage": "Profil inactif"}, status=status.HTTP_401_UNAUTHORIZED)
|
|
|
|
login(request, user)
|
|
user.save()
|
|
retour = ''
|
|
access_token, refresh_token = makeToken(user)
|
|
|
|
return JsonResponse({
|
|
'token': access_token,
|
|
'refresh': refresh_token
|
|
}, safe=False)
|
|
|
|
else:
|
|
retour = error.returnMessage[error.WRONG_ID]
|
|
|
|
return JsonResponse({
|
|
'errorFields': errorFields,
|
|
'errorMessage': retour,
|
|
}, safe=False, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
def makeToken(user):
|
|
"""
|
|
Fonction pour créer un token JWT pour l'utilisateur donné.
|
|
"""
|
|
try:
|
|
# Récupérer tous les rôles de l'utilisateur actifs
|
|
roles_qs = ProfileRole.objects.filter(profile=user, is_active=True).select_related('establishment')
|
|
roles = []
|
|
for role in roles_qs:
|
|
logo_url = ""
|
|
if role.establishment.logo:
|
|
# Construit l'URL complète pour le logo
|
|
logo_url = f"{role.establishment.logo.url}"
|
|
roles.append({
|
|
"role_type": role.role_type,
|
|
"establishment__id": role.establishment.id,
|
|
"establishment__name": role.establishment.name,
|
|
"establishment__evaluation_frequency": role.establishment.evaluation_frequency,
|
|
"establishment__total_capacity": role.establishment.total_capacity,
|
|
"establishment__logo": logo_url,
|
|
})
|
|
|
|
# Générer le JWT avec la bonne syntaxe datetime
|
|
access_payload = {
|
|
'user_id': user.id,
|
|
'email': user.email,
|
|
'roleIndexLoginDefault': user.roleIndexLoginDefault,
|
|
'roles': roles,
|
|
'type': 'access',
|
|
'exp': datetime.utcnow() + settings.SIMPLE_JWT['ACCESS_TOKEN_LIFETIME'],
|
|
'iat': datetime.utcnow(),
|
|
}
|
|
|
|
access_token = jwt.encode(access_payload, settings.SIMPLE_JWT['SIGNING_KEY'], algorithm=settings.SIMPLE_JWT['ALGORITHM'])
|
|
# Générer le Refresh Token (exp: 7 jours)
|
|
refresh_payload = {
|
|
'user_id': user.id,
|
|
'type': 'refresh',
|
|
'exp': datetime.utcnow() + settings.SIMPLE_JWT['REFRESH_TOKEN_LIFETIME'],
|
|
'iat': datetime.utcnow(),
|
|
}
|
|
refresh_token = jwt.encode(refresh_payload, settings.SIMPLE_JWT['SIGNING_KEY'], algorithm=settings.SIMPLE_JWT['ALGORITHM'])
|
|
return access_token, refresh_token
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de la création du token: {str(e)}")
|
|
return None
|
|
|
|
class RefreshJWTView(APIView):
|
|
@swagger_auto_schema(
|
|
operation_description="Rafraîchir le token d'accès",
|
|
request_body=openapi.Schema(
|
|
type=openapi.TYPE_OBJECT,
|
|
required=['refresh'],
|
|
properties={
|
|
'refresh': openapi.Schema(type=openapi.TYPE_STRING)
|
|
}
|
|
),
|
|
responses={
|
|
200: openapi.Response('Token rafraîchi avec succès', schema=openapi.Schema(
|
|
type=openapi.TYPE_OBJECT,
|
|
properties={
|
|
'token': openapi.Schema(type=openapi.TYPE_STRING),
|
|
'refresh': openapi.Schema(type=openapi.TYPE_STRING),
|
|
}
|
|
)),
|
|
400: openapi.Response('Échec du rafraîchissement', schema=openapi.Schema(
|
|
type=openapi.TYPE_OBJECT,
|
|
properties={
|
|
'errorMessage': openapi.Schema(type=openapi.TYPE_STRING)
|
|
}
|
|
))
|
|
}
|
|
)
|
|
@method_decorator(csrf_exempt, name='dispatch')
|
|
def post(self, request):
|
|
data = JSONParser().parse(request)
|
|
refresh_token = data.get("refresh")
|
|
logger.info(f"Token reçu: {refresh_token[:20]}...") # Ne pas logger le token complet pour la sécurité
|
|
|
|
if not refresh_token:
|
|
return JsonResponse({'errorMessage': 'Refresh token manquant'}, status=400)
|
|
|
|
try:
|
|
# Décoder le Refresh Token
|
|
logger.info("Tentative de décodage du token")
|
|
logger.info(f"Algorithme utilisé: {settings.SIMPLE_JWT['ALGORITHM']}")
|
|
|
|
# Vérifier le format du token avant décodage
|
|
token_parts = refresh_token.split('.')
|
|
if len(token_parts) != 3:
|
|
logger.error("Format de token invalide - pas 3 parties")
|
|
return JsonResponse({'errorMessage': 'Format de token invalide'}, status=400)
|
|
|
|
payload = jwt.decode(
|
|
refresh_token,
|
|
settings.SIMPLE_JWT['SIGNING_KEY'],
|
|
algorithms=[settings.SIMPLE_JWT['ALGORITHM']] # Noter le passage en liste
|
|
)
|
|
|
|
logger.info(f"Token décodé avec succès. Type: {payload.get('type')}")
|
|
# Vérifier s'il s'agit bien d'un Refresh Token
|
|
if payload.get('type') != 'refresh':
|
|
return JsonResponse({'errorMessage': 'Token invalide'}, status=400)
|
|
|
|
# Récupérer les informations utilisateur
|
|
user = Profile.objects.get(id=payload['user_id'])
|
|
if not user:
|
|
return JsonResponse({'errorMessage': 'Utilisateur non trouvé'}, status=404)
|
|
|
|
new_access_token, new_refresh_token = makeToken(user)
|
|
|
|
return JsonResponse({'token': new_access_token, 'refresh': new_refresh_token}, status=200)
|
|
|
|
except ExpiredSignatureError as e:
|
|
logger.error(f"Token expiré: {str(e)}")
|
|
return JsonResponse({'errorMessage': 'Refresh token expiré'}, status=400)
|
|
except InvalidTokenError as e:
|
|
logger.error(f"Token invalide: {str(e)}")
|
|
return JsonResponse({'errorMessage': f'Token invalide: {str(e)}'}, status=400)
|
|
except Exception as e:
|
|
logger.error(f"Erreur inattendue: {str(e)}")
|
|
return JsonResponse({'errorMessage': f'Erreur inattendue: {str(e)}'}, status=400)
|
|
|
|
@method_decorator(csrf_protect, name='dispatch')
|
|
@method_decorator(ensure_csrf_cookie, name='dispatch')
|
|
class SubscribeView(APIView):
|
|
@swagger_auto_schema(
|
|
operation_description="Inscription utilisateur",
|
|
manual_parameters=[
|
|
openapi.Parameter(
|
|
'establishment_id', openapi.IN_QUERY,
|
|
description="ID de l'établissement",
|
|
type=openapi.TYPE_INTEGER,
|
|
required=True
|
|
)
|
|
],
|
|
request_body=openapi.Schema(
|
|
type=openapi.TYPE_OBJECT,
|
|
required=['email', 'password1', 'password2'],
|
|
properties={
|
|
'email': openapi.Schema(type=openapi.TYPE_STRING),
|
|
'password1': openapi.Schema(type=openapi.TYPE_STRING),
|
|
'password2': openapi.Schema(type=openapi.TYPE_STRING)
|
|
}
|
|
),
|
|
responses={
|
|
200: openapi.Response('Inscription réussie', schema=openapi.Schema(
|
|
type=openapi.TYPE_OBJECT,
|
|
properties={
|
|
'message': openapi.Schema(type=openapi.TYPE_STRING),
|
|
'errorMessage': openapi.Schema(type=openapi.TYPE_STRING),
|
|
'errorFields': openapi.Schema(type=openapi.TYPE_OBJECT),
|
|
'id': openapi.Schema(type=openapi.TYPE_INTEGER)
|
|
}
|
|
))
|
|
}
|
|
)
|
|
def post(self, request):
|
|
retourErreur = ''
|
|
retour = ''
|
|
newProfilConnection = JSONParser().parse(request)
|
|
establishment_id = newProfilConnection['establishment_id']
|
|
|
|
validatorSubscription = validator.ValidatorSubscription(data=newProfilConnection)
|
|
validationOk, errorFields = validatorSubscription.validate()
|
|
|
|
if validationOk:
|
|
# On vérifie que l'email existe : si ce n'est pas le cas, on retourne une erreur
|
|
profil = bdd.getProfile(Profile.objects.all(), newProfilConnection.get('email'))
|
|
if profil is None:
|
|
retourErreur = error.returnMessage[error.PROFIL_NOT_EXISTS]
|
|
else:
|
|
# Vérifier si le profil a déjà un rôle actif pour l'établissement donné
|
|
active_roles = ProfileRole.objects.filter(profile=profil, establishment=establishment_id, is_active=True)
|
|
if active_roles.exists():
|
|
retourErreur = error.returnMessage[error.PROFIL_ACTIVE]
|
|
return JsonResponse({'message': retour, 'errorMessage': retourErreur, "errorFields": errorFields, "id": profil.id}, safe=False)
|
|
else:
|
|
try:
|
|
profil.set_password(newProfilConnection.get('password1'))
|
|
profil.full_clean()
|
|
profil.save()
|
|
|
|
# Récupérer le ProfileRole existant pour l'établissement et le profil
|
|
profile_role = ProfileRole.objects.filter(profile=profil, establishment=establishment_id).first()
|
|
if profile_role:
|
|
profile_role.is_active = True
|
|
profile_role.save()
|
|
else:
|
|
# Si aucun ProfileRole n'existe, en créer un nouveau
|
|
role_data = {
|
|
'profile': profil.id,
|
|
'establishment': establishment_id,
|
|
'is_active': True
|
|
}
|
|
role_serializer = ProfileRoleSerializer(data=role_data)
|
|
if role_serializer.is_valid():
|
|
role_serializer.save()
|
|
else:
|
|
return JsonResponse(role_serializer.errors, safe=False, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
retour = error.returnMessage[error.MESSAGE_ACTIVATION_PROFILE]
|
|
retourErreur = ''
|
|
return JsonResponse({'message': retour, 'errorMessage': retourErreur, "errorFields": errorFields, "id": profil.id}, safe=False)
|
|
except ValidationError as e:
|
|
retourErreur = error.returnMessage[error.WRONG_MAIL_FORMAT]
|
|
return JsonResponse({'message': retour, 'errorMessage': retourErreur, "errorFields": errorFields}, safe=False)
|
|
|
|
return JsonResponse({'message': retour, 'errorMessage': retourErreur, "errorFields": errorFields, "id": -1}, safe=False)
|
|
|
|
@method_decorator(csrf_protect, name='dispatch')
|
|
@method_decorator(ensure_csrf_cookie, name='dispatch')
|
|
class NewPasswordView(APIView):
|
|
@swagger_auto_schema(
|
|
operation_description="Demande de nouveau mot de passe",
|
|
request_body=openapi.Schema(
|
|
type=openapi.TYPE_OBJECT,
|
|
required=['email'],
|
|
properties={
|
|
'email': openapi.Schema(type=openapi.TYPE_STRING)
|
|
}
|
|
),
|
|
responses={
|
|
200: openapi.Response('Demande réussie', schema=openapi.Schema(
|
|
type=openapi.TYPE_OBJECT,
|
|
properties={
|
|
'message': openapi.Schema(type=openapi.TYPE_STRING),
|
|
'errorMessage': openapi.Schema(type=openapi.TYPE_STRING),
|
|
'errorFields': openapi.Schema(type=openapi.TYPE_OBJECT)
|
|
}
|
|
))
|
|
}
|
|
)
|
|
def post(self, request):
|
|
retourErreur = ''
|
|
retour = ''
|
|
newProfilConnection = JSONParser().parse(request)
|
|
|
|
validatorNewPassword = validator.ValidatorNewPassword(data=newProfilConnection)
|
|
validationOk, errorFields = validatorNewPassword.validate()
|
|
if validationOk:
|
|
profil = bdd.getProfile(Profile.objects.all(), newProfilConnection.get('email'))
|
|
if profil is None:
|
|
retourErreur = error.returnMessage[error.PROFIL_NOT_EXISTS]
|
|
else:
|
|
try:
|
|
# Génération d'une URL provisoire pour modifier le mot de passe
|
|
profil.code = util.genereRandomCode(12)
|
|
profil.datePeremption = util.calculeDatePeremption(util._now(), settings.EXPIRATION_URL_NB_DAYS)
|
|
profil.save()
|
|
retourErreur = ''
|
|
retour = error.returnMessage[error.MESSAGE_REINIT_PASSWORD] % (newProfilConnection.get('email'))
|
|
mailer.envoieReinitMotDePasse(newProfilConnection.get('email'), profil.code)
|
|
except ValidationError as e:
|
|
retourErreur = error.returnMessage[error.WRONG_MAIL_FORMAT]
|
|
return JsonResponse({'message': retour, 'errorMessage': retourErreur, "errorFields": errorFields}, safe=False)
|
|
|
|
return JsonResponse({'message': retour, 'errorMessage': retourErreur, "errorFields": errorFields}, safe=False)
|
|
|
|
@method_decorator(csrf_protect, name='dispatch')
|
|
@method_decorator(ensure_csrf_cookie, name='dispatch')
|
|
class ResetPasswordView(APIView):
|
|
@swagger_auto_schema(
|
|
operation_description="Réinitialisation du mot de passe",
|
|
request_body=openapi.Schema(
|
|
type=openapi.TYPE_OBJECT,
|
|
required=['password1', 'password2'],
|
|
properties={
|
|
'password1': openapi.Schema(type=openapi.TYPE_STRING),
|
|
'password2': openapi.Schema(type=openapi.TYPE_STRING)
|
|
}
|
|
),
|
|
responses={
|
|
200: openapi.Response('Réinitialisation réussie', schema=openapi.Schema(
|
|
type=openapi.TYPE_OBJECT,
|
|
properties={
|
|
'message': openapi.Schema(type=openapi.TYPE_STRING),
|
|
'errorMessage': openapi.Schema(type=openapi.TYPE_STRING),
|
|
'errorFields': openapi.Schema(type=openapi.TYPE_OBJECT)
|
|
}
|
|
))
|
|
}
|
|
)
|
|
def post(self, request, code):
|
|
retourErreur = ''
|
|
retour = ''
|
|
newProfilConnection = JSONParser().parse(request)
|
|
|
|
validatorResetPassword = validator.ValidatorResetPassword(data=newProfilConnection)
|
|
validationOk, errorFields = validatorResetPassword.validate()
|
|
|
|
profil = bdd.getObject(Profile, "code", code)
|
|
if profil:
|
|
|
|
if datetime.strptime(util.convertToStr(util._now(), '%d-%m-%Y %H:%M'), '%d-%m-%Y %H:%M') > datetime.strptime(profil.datePeremption, '%d-%m-%Y %H:%M'):
|
|
retourErreur = error.returnMessage[error.EXPIRED_URL]
|
|
elif validationOk:
|
|
retour = error.returnMessage[error.PASSWORD_CHANGED]
|
|
|
|
profil.set_password(newProfilConnection.get('password1'))
|
|
profil.code = ''
|
|
profil.datePeremption = ''
|
|
profil.save()
|
|
retourErreur = ''
|
|
|
|
return JsonResponse({'message': retour, "errorMessage": retourErreur, "errorFields": errorFields}, safe=False)
|
|
|
|
class ProfileRoleView(APIView):
|
|
pagination_class = CustomProfilesPagination
|
|
@swagger_auto_schema(
|
|
operation_description="Obtenir la liste des profile_roles",
|
|
responses={200: ProfileRoleSerializer(many=True)}
|
|
)
|
|
def get(self, request):
|
|
filter = request.GET.get('filter', '').strip()
|
|
page_size = request.GET.get('page_size', None)
|
|
establishment_id = request.GET.get('establishment_id', None)
|
|
|
|
# Gestion du page_size
|
|
if page_size is not None:
|
|
try:
|
|
page_size = int(page_size)
|
|
except ValueError:
|
|
page_size = settings.NB_RESULT_PROFILES_PER_PAGE
|
|
|
|
if establishment_id is None:
|
|
return JsonResponse({'error': 'establishment_id est requis'}, safe=False, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
# Récupérer les ProfileRole en fonction du filtre
|
|
profiles_roles_List = ProfileRole.objects.filter(establishment_id=establishment_id)
|
|
|
|
if filter == 'parents':
|
|
profiles_roles_List = profiles_roles_List.filter(role_type=ProfileRole.RoleType.PROFIL_PARENT)
|
|
elif filter == 'school':
|
|
profiles_roles_List = profiles_roles_List.filter(
|
|
Q(role_type=ProfileRole.RoleType.PROFIL_ECOLE) |
|
|
Q(role_type=ProfileRole.RoleType.PROFIL_ADMIN)
|
|
)
|
|
else:
|
|
return JsonResponse({'error': 'Filtre invalide'}, safe=False, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
# Trier les résultats par date de mise à jour
|
|
profiles_roles_List = profiles_roles_List.distinct().order_by('-updated_date')
|
|
|
|
if not profiles_roles_List:
|
|
return JsonResponse({'error': 'aucune donnée trouvée', 'count': 0}, safe=False)
|
|
|
|
# Pagination
|
|
paginator = self.pagination_class()
|
|
page = paginator.paginate_queryset(profiles_roles_List, request)
|
|
|
|
if page is not None:
|
|
profile_roles_serializer = ProfileRoleSerializer(page, many=True)
|
|
response_data = paginator.get_paginated_response(profile_roles_serializer.data)
|
|
return JsonResponse(response_data, safe=False)
|
|
|
|
return JsonResponse({'error': 'aucune donnée trouvée', 'count': 0}, safe=False)
|
|
|
|
@swagger_auto_schema(
|
|
operation_description="Créer un nouveau profile_role",
|
|
request_body=ProfileRoleSerializer,
|
|
responses={
|
|
200: ProfileRoleSerializer,
|
|
400: 'Données invalides'
|
|
}
|
|
)
|
|
def post(self, request):
|
|
profile_role_data = JSONParser().parse(request)
|
|
profile_role_serializer = ProfileRoleSerializer(data=profile_role_data)
|
|
|
|
if profile_role_serializer.is_valid():
|
|
profile_role = profile_role_serializer.save()
|
|
return JsonResponse(profile_role_serializer.data, safe=False)
|
|
|
|
return JsonResponse(profile_role_serializer.errors, safe=False, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
@method_decorator(csrf_protect, name='dispatch')
|
|
@method_decorator(ensure_csrf_cookie, name='dispatch')
|
|
class ProfileRoleSimpleView(APIView):
|
|
@swagger_auto_schema(
|
|
operation_description="Obtenir un profile_role par son ID",
|
|
responses={200: ProfileRoleSerializer}
|
|
)
|
|
def get(self, request, id):
|
|
profile_role = bdd.getObject(ProfileRole, "id", id)
|
|
profile_role_serializer = ProfileRoleSerializer(profile_role)
|
|
return JsonResponse(profile_role_serializer.data, safe=False)
|
|
|
|
@swagger_auto_schema(
|
|
operation_description="Mettre à jour un profile_role",
|
|
request_body=ProfileRoleSerializer,
|
|
responses={
|
|
200: 'Mise à jour réussie',
|
|
400: 'Données invalides'
|
|
}
|
|
)
|
|
def put(self, request, id):
|
|
data = JSONParser().parse(request)
|
|
profile_role = ProfileRole.objects.get(id=id)
|
|
profile_role_serializer = ProfileRoleSerializer(profile_role, data=data)
|
|
if profile_role_serializer.is_valid():
|
|
profile_role_serializer.save()
|
|
return JsonResponse(profile_role_serializer.data, safe=False)
|
|
|
|
return JsonResponse(profile_role_serializer.errors, safe=False, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
@swagger_auto_schema(
|
|
operation_description="Supprimer un profile_role",
|
|
responses={200: 'Suppression réussie'}
|
|
)
|
|
def delete(self, request, id):
|
|
try:
|
|
# Récupérer le ProfileRole
|
|
profile_role = ProfileRole.objects.get(id=id)
|
|
profile = profile_role.profile
|
|
|
|
# Vérifier si le ProfileRole est de type PARENT
|
|
if profile_role.role_type == ProfileRole.RoleType.PROFIL_PARENT:
|
|
guardian = Guardian.objects.filter(profile_role=profile_role).first()
|
|
if guardian:
|
|
# Vérifier si ce Guardian est rattaché à des élèves
|
|
for student in guardian.student_set.all():
|
|
# Vérifier si l'élève n'a pas d'autres Guardians
|
|
other_guardians = student.guardians.exclude(id=guardian.id)
|
|
if not other_guardians.exists():
|
|
return JsonResponse(
|
|
{"error": f"Impossible de supprimer ce profil car l'élève {student.first_name} {student.last_name} n'aura plus de responsable légal."},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
# Supprimer le ProfileRole
|
|
profile_role.delete()
|
|
|
|
# Vérifier si le profil n'a plus de rôles associés
|
|
if not ProfileRole.objects.filter(profile=profile).exists():
|
|
profile.delete()
|
|
|
|
return JsonResponse({'message': 'Suppression réussie'}, safe=False)
|
|
|
|
except ProfileRole.DoesNotExist:
|
|
return JsonResponse(
|
|
{"error": "ProfileRole non trouvé."},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
except Exception as e:
|
|
return JsonResponse(
|
|
{"error": f"Une erreur est survenue : {str(e)}"},
|
|
status=status.HTTP_500_INTERNAL_SERVER_ERROR
|
|
) |