feat: Securisation du Backend

This commit is contained in:
Luc SORIGNET
2026-02-27 10:45:36 +01:00
parent 2fef6d61a4
commit fa843097ba
55 changed files with 2898 additions and 910 deletions

View File

@ -2,6 +2,12 @@ from django.contrib.auth import get_user_model
from django.contrib.auth.backends import ModelBackend
from Auth.models import Profile
from N3wtSchool import bdd
from rest_framework_simplejwt.authentication import JWTAuthentication
from rest_framework_simplejwt.exceptions import TokenError, InvalidToken
import logging
logger = logging.getLogger("Auth")
class EmailBackend(ModelBackend):
def authenticate(self, request, username=None, password=None, **kwargs):
@ -18,3 +24,45 @@ class EmailBackend(ModelBackend):
except Profile.DoesNotExist:
return None
class LoggingJWTAuthentication(JWTAuthentication):
"""
Surclasse JWTAuthentication pour loguer pourquoi un token Bearer est rejeté.
Cela aide à diagnostiquer les 401 sans avoir à ajouter des prints partout.
"""
def authenticate(self, request):
header = self.get_header(request)
if header is None:
logger.debug("JWT: pas de header Authorization dans la requête %s %s",
request.method, request.path)
return None
raw_token = self.get_raw_token(header)
if raw_token is None:
logger.debug("JWT: header Authorization présent mais token vide pour %s %s",
request.method, request.path)
return None
try:
validated_token = self.get_validated_token(raw_token)
except InvalidToken as e:
logger.warning(
"JWT: token invalide pour %s %s%s",
request.method, request.path, str(e)
)
raise
try:
user = self.get_user(validated_token)
except Exception as e:
logger.warning(
"JWT: utilisateur introuvable pour %s %s%s",
request.method, request.path, str(e)
)
raise
logger.debug("JWT: authentification réussie user_id=%s pour %s %s",
user.pk, request.method, request.path)
return user, validated_token

View File

@ -1,4 +1,4 @@
# Generated by Django 5.1.3 on 2025-11-30 11:02
# Generated by Django 5.1.3 on 2026-03-14 13:23
import django.contrib.auth.models
import django.contrib.auth.validators
@ -24,7 +24,7 @@ class Migration(migrations.Migration):
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('role_type', models.IntegerField(choices=[(-1, 'NON DEFINI'), (0, 'ECOLE'), (1, 'ADMIN'), (2, 'PARENT')], default=-1)),
('is_active', models.BooleanField(default=False)),
('is_active', models.BooleanField(blank=True, default=False)),
('updated_date', models.DateTimeField(auto_now=True)),
('establishment', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='profile_roles', to='Establishment.establishment')),
],

553
Back-End/Auth/tests.py Normal file
View File

@ -0,0 +1,553 @@
"""
Tests unitaires pour le module Auth.
Vérifie :
- L'accès public aux endpoints de login/CSRF/subscribe
- La protection JWT des endpoints protégés (profils, rôles, session)
- La génération et validation des tokens JWT
"""
import json
from django.test import TestCase, override_settings
from django.urls import reverse
from rest_framework import status
from rest_framework.test import APIClient
from rest_framework_simplejwt.tokens import RefreshToken
from Auth.models import Profile, ProfileRole
from Establishment.models import Establishment
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def create_establishment():
"""Crée un établissement minimal utilisé dans les tests."""
return Establishment.objects.create(
name="Ecole Test",
address="1 rue de l'Ecole",
total_capacity=100,
establishment_type=[1],
)
def create_user(email="test@example.com", password="testpassword123"):
"""Crée un utilisateur (Profile) de test."""
user = Profile.objects.create_user(
username=email,
email=email,
password=password,
)
return user
def create_active_user_with_role(email="active@example.com", password="testpassword123"):
"""Crée un utilisateur avec un rôle actif."""
user = create_user(email=email, password=password)
establishment = create_establishment()
ProfileRole.objects.create(
profile=user,
role_type=ProfileRole.RoleType.PROFIL_ADMIN,
establishment=establishment,
is_active=True,
)
return user
def get_jwt_token(user):
"""Retourne un token d'accès JWT pour l'utilisateur donné."""
refresh = RefreshToken.for_user(user)
return str(refresh.access_token)
# ---------------------------------------------------------------------------
# Tests endpoints publics
# ---------------------------------------------------------------------------
@override_settings(
CACHES={'default': {'BACKEND': 'django.core.cache.backends.locmem.LocMemCache'}},
SESSION_ENGINE='django.contrib.sessions.backends.db',
)
class CsrfEndpointTest(TestCase):
"""Test de l'endpoint CSRF doit être accessible sans authentification."""
def setUp(self):
self.client = APIClient()
def test_csrf_endpoint_accessible_sans_auth(self):
"""GET /Auth/csrf doit retourner 200 sans token."""
response = self.client.get(reverse("Auth:csrf"))
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn("csrfToken", response.json())
@override_settings(
CACHES={'default': {'BACKEND': 'django.core.cache.backends.locmem.LocMemCache'}},
SESSION_ENGINE='django.contrib.sessions.backends.db',
)
class LoginEndpointTest(TestCase):
"""Tests de l'endpoint de connexion."""
def setUp(self):
self.client = APIClient()
self.url = reverse("Auth:login")
self.user = create_active_user_with_role(
email="logintest@example.com", password="secureP@ss1"
)
def test_login_avec_identifiants_valides(self):
"""POST /Auth/login avec identifiants valides retourne 200 et un token."""
payload = {"email": "logintest@example.com", "password": "secureP@ss1"}
response = self.client.post(
self.url, data=json.dumps(payload), content_type="application/json"
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
data = response.json()
self.assertIn("token", data)
self.assertIn("refresh", data)
def test_login_avec_mauvais_mot_de_passe(self):
"""POST /Auth/login avec mauvais mot de passe retourne 400 ou 401."""
payload = {"email": "logintest@example.com", "password": "wrongpassword"}
response = self.client.post(
self.url, data=json.dumps(payload), content_type="application/json"
)
self.assertIn(response.status_code, [status.HTTP_400_BAD_REQUEST, status.HTTP_401_UNAUTHORIZED])
def test_login_avec_email_inexistant(self):
"""POST /Auth/login avec email inconnu retourne 400 ou 401."""
payload = {"email": "unknown@example.com", "password": "anypassword"}
response = self.client.post(
self.url, data=json.dumps(payload), content_type="application/json"
)
self.assertIn(response.status_code, [status.HTTP_400_BAD_REQUEST, status.HTTP_401_UNAUTHORIZED])
def test_login_accessible_sans_authentification(self):
"""L'endpoint de login doit être accessible sans token JWT."""
# On vérifie juste que l'on n'obtient pas 401/403 pour raison d'auth manquante
payload = {"email": "logintest@example.com", "password": "secureP@ss1"}
response = self.client.post(
self.url, data=json.dumps(payload), content_type="application/json"
)
self.assertNotEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
self.assertNotEqual(response.status_code, status.HTTP_403_FORBIDDEN)
@override_settings(
CACHES={'default': {'BACKEND': 'django.core.cache.backends.locmem.LocMemCache'}},
SESSION_ENGINE='django.contrib.sessions.backends.db',
)
class RefreshJWTEndpointTest(TestCase):
"""Tests de l'endpoint de rafraîchissement du token JWT."""
def setUp(self):
self.client = APIClient()
self.url = reverse("Auth:refresh_jwt")
self.user = create_active_user_with_role(email="refresh@example.com")
def test_refresh_avec_token_valide(self):
"""POST /Auth/refreshJWT avec refresh token valide retourne un nouvel access token."""
import jwt, uuid
from datetime import datetime, timedelta
from django.conf import settings as django_settings
# RefreshJWTView attend le format custom (type='refresh'), pas le format SimpleJWT
refresh_payload = {
'user_id': self.user.id,
'type': 'refresh',
'jti': str(uuid.uuid4()),
'exp': datetime.utcnow() + timedelta(days=1),
'iat': datetime.utcnow(),
}
custom_refresh = jwt.encode(
refresh_payload,
django_settings.SIMPLE_JWT['SIGNING_KEY'],
algorithm=django_settings.SIMPLE_JWT['ALGORITHM'],
)
payload = {"refresh": custom_refresh}
response = self.client.post(
self.url, data=json.dumps(payload), content_type="application/json"
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn("token", response.json())
def test_refresh_avec_token_invalide(self):
"""POST /Auth/refreshJWT avec token invalide retourne 401."""
payload = {"refresh": "invalid.token.here"}
response = self.client.post(
self.url, data=json.dumps(payload), content_type="application/json"
)
self.assertIn(response.status_code, [status.HTTP_400_BAD_REQUEST, status.HTTP_401_UNAUTHORIZED])
def test_refresh_accessible_sans_authentification(self):
"""L'endpoint de refresh doit être accessible sans token d'accès."""
refresh = RefreshToken.for_user(self.user)
payload = {"refresh": str(refresh)}
response = self.client.post(
self.url, data=json.dumps(payload), content_type="application/json"
)
self.assertNotEqual(response.status_code, status.HTTP_403_FORBIDDEN)
# ---------------------------------------------------------------------------
# Tests endpoints protégés Session
# ---------------------------------------------------------------------------
@override_settings(
CACHES={'default': {'BACKEND': 'django.core.cache.backends.locmem.LocMemCache'}},
SESSION_ENGINE='django.contrib.sessions.backends.db',
)
class SessionEndpointTest(TestCase):
"""Tests de l'endpoint d'information de session."""
def setUp(self):
self.client = APIClient()
self.url = reverse("Auth:infoSession")
self.user = create_active_user_with_role(email="session@example.com")
def test_info_session_sans_token_retourne_401(self):
"""GET /Auth/infoSession sans token doit retourner 401."""
response = self.client.get(self.url)
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
def test_info_session_avec_token_valide_retourne_200(self):
"""GET /Auth/infoSession avec token valide doit retourner 200 et les données utilisateur."""
token = get_jwt_token(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token}")
response = self.client.get(self.url)
self.assertEqual(response.status_code, status.HTTP_200_OK)
data = response.json()
self.assertIn("user", data)
self.assertEqual(data["user"]["email"], self.user.email)
def test_info_session_avec_token_invalide_retourne_401(self):
"""GET /Auth/infoSession avec token invalide doit retourner 401."""
self.client.credentials(HTTP_AUTHORIZATION="Bearer token.invalide.xyz")
response = self.client.get(self.url)
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
def test_info_session_avec_token_expire_retourne_401(self):
"""GET /Auth/infoSession avec un token expiré doit retourner 401."""
import jwt
from datetime import datetime, timedelta
from django.conf import settings as django_settings
expired_payload = {
'user_id': self.user.id,
'exp': datetime.utcnow() - timedelta(hours=1),
}
expired_token = jwt.encode(
expired_payload, django_settings.SECRET_KEY, algorithm='HS256'
)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {expired_token}")
response = self.client.get(self.url)
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
# ---------------------------------------------------------------------------
# Tests endpoints protégés Profils
# ---------------------------------------------------------------------------
@override_settings(
CACHES={'default': {'BACKEND': 'django.core.cache.backends.locmem.LocMemCache'}},
SESSION_ENGINE='django.contrib.sessions.backends.db',
REST_FRAMEWORK={
'DEFAULT_AUTHENTICATION_CLASSES': (
'rest_framework_simplejwt.authentication.JWTAuthentication',
),
'DEFAULT_PERMISSION_CLASSES': (
'rest_framework.permissions.IsAuthenticated',
),
},
)
class ProfileEndpointAuthTest(TestCase):
"""Tests d'authentification sur les endpoints de profils."""
def setUp(self):
self.client = APIClient()
self.profiles_url = reverse("Auth:profile")
self.user = create_active_user_with_role(email="profile_auth@example.com")
def test_get_profiles_sans_auth_retourne_401(self):
"""GET /Auth/profiles sans token doit retourner 401."""
response = self.client.get(self.profiles_url)
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
def test_get_profiles_avec_auth_retourne_200(self):
"""GET /Auth/profiles avec token valide doit retourner 200."""
token = get_jwt_token(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token}")
response = self.client.get(self.profiles_url)
self.assertEqual(response.status_code, status.HTTP_200_OK)
def test_post_profile_sans_auth_retourne_401(self):
"""POST /Auth/profiles sans token doit retourner 401."""
payload = {"email": "new@example.com", "password": "pass123"}
response = self.client.post(
self.profiles_url,
data=json.dumps(payload),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
def test_get_profile_par_id_sans_auth_retourne_401(self):
"""GET /Auth/profiles/{id} sans token doit retourner 401."""
url = reverse("Auth:profile", kwargs={"id": self.user.id})
response = self.client.get(url)
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
def test_put_profile_sans_auth_retourne_401(self):
"""PUT /Auth/profiles/{id} sans token doit retourner 401."""
url = reverse("Auth:profile", kwargs={"id": self.user.id})
payload = {"email": self.user.email}
response = self.client.put(
url, data=json.dumps(payload), content_type="application/json"
)
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
def test_delete_profile_sans_auth_retourne_401(self):
"""DELETE /Auth/profiles/{id} sans token doit retourner 401."""
url = reverse("Auth:profile", kwargs={"id": self.user.id})
response = self.client.delete(url)
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
# ---------------------------------------------------------------------------
# Tests endpoints protégés ProfileRole
# ---------------------------------------------------------------------------
@override_settings(
CACHES={'default': {'BACKEND': 'django.core.cache.backends.locmem.LocMemCache'}},
SESSION_ENGINE='django.contrib.sessions.backends.db',
REST_FRAMEWORK={
'DEFAULT_AUTHENTICATION_CLASSES': (
'rest_framework_simplejwt.authentication.JWTAuthentication',
),
'DEFAULT_PERMISSION_CLASSES': (
'rest_framework.permissions.IsAuthenticated',
),
},
)
class ProfileRoleEndpointAuthTest(TestCase):
"""Tests d'authentification sur les endpoints de rôles."""
def setUp(self):
self.client = APIClient()
self.profile_roles_url = reverse("Auth:profileRoles")
self.user = create_active_user_with_role(email="roles_auth@example.com")
def test_get_profile_roles_sans_auth_retourne_401(self):
"""GET /Auth/profileRoles sans token doit retourner 401."""
response = self.client.get(self.profile_roles_url)
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
def test_get_profile_roles_avec_auth_retourne_200(self):
"""GET /Auth/profileRoles avec token valide doit retourner 200."""
token = get_jwt_token(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token}")
response = self.client.get(self.profile_roles_url)
self.assertNotIn(
response.status_code,
[status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN],
msg="Un token valide ne doit pas être rejeté par la couche d'authentification",
)
def test_post_profile_role_sans_auth_retourne_401(self):
"""POST /Auth/profileRoles sans token doit retourner 401."""
payload = {"profile": self.user.id, "role_type": 1}
response = self.client.post(
self.profile_roles_url,
data=json.dumps(payload),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
# ---------------------------------------------------------------------------
# Tests de génération de token JWT
# ---------------------------------------------------------------------------
@override_settings(
CACHES={'default': {'BACKEND': 'django.core.cache.backends.locmem.LocMemCache'}},
SESSION_ENGINE='django.contrib.sessions.backends.db',
)
class JWTTokenGenerationTest(TestCase):
"""Tests de génération et validation des tokens JWT."""
def setUp(self):
self.user = create_user(email="jwt@example.com", password="jwttest123")
def test_generation_token_valide(self):
"""Un token généré pour un utilisateur est valide et contient user_id."""
import jwt
from django.conf import settings as django_settings
token = get_jwt_token(self.user)
self.assertIsNotNone(token)
self.assertIsInstance(token, str)
decoded = jwt.decode(token, django_settings.SECRET_KEY, algorithms=["HS256"])
self.assertEqual(decoded["user_id"], self.user.id)
def test_refresh_token_permet_obtenir_nouvel_access_token(self):
"""Le refresh token permet d'obtenir un nouvel access token via SimpleJWT."""
refresh = RefreshToken.for_user(self.user)
access = refresh.access_token
self.assertIsNotNone(str(access))
self.assertIsNotNone(str(refresh))
def test_token_different_par_utilisateur(self):
"""Deux utilisateurs différents ont des tokens différents."""
user2 = create_user(email="jwt2@example.com", password="jwttest123")
token1 = get_jwt_token(self.user)
token2 = get_jwt_token(user2)
self.assertNotEqual(token1, token2)
# ---------------------------------------------------------------------------
# Tests de sécurité — Correction des vulnérabilités identifiées
# ---------------------------------------------------------------------------
@override_settings(
CACHES={'default': {'BACKEND': 'django.core.cache.backends.locmem.LocMemCache'}},
SESSION_ENGINE='django.contrib.sessions.backends.db',
)
class SessionViewTokenTypeTest(TestCase):
"""
SessionView doit rejeter les refresh tokens.
Avant la correction, jwt.decode() était appelé sans vérification du claim 'type',
ce qui permettait d'utiliser un refresh token là où seul un access token est attendu.
"""
def setUp(self):
self.client = APIClient()
self.url = reverse("Auth:infoSession")
self.user = create_active_user_with_role(email="session_type@example.com")
def test_refresh_token_rejete_par_session_view(self):
"""
Utiliser un refresh token SimpleJWT sur /infoSession doit retourner 401.
"""
import jwt
from datetime import datetime, timedelta
from django.conf import settings as django_settings
# Fabriquer manuellement un token de type 'refresh' signé avec la clé correcte
refresh_payload = {
'user_id': self.user.id,
'type': 'refresh', # ← type incorrect pour cet endpoint
'jti': 'test-refresh-jti',
'exp': datetime.utcnow() + timedelta(days=1),
'iat': datetime.utcnow(),
}
refresh_token = jwt.encode(
refresh_payload, django_settings.SECRET_KEY, algorithm='HS256'
)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {refresh_token}")
response = self.client.get(self.url)
self.assertEqual(
response.status_code, status.HTTP_401_UNAUTHORIZED,
"Un refresh token ne doit pas être accepté sur /infoSession (OWASP A07 - Auth Failures)"
)
def test_access_token_accepte_par_session_view(self):
"""Un access token de type 'access' est accepté."""
import jwt
from datetime import datetime, timedelta
from django.conf import settings as django_settings
access_payload = {
'user_id': self.user.id,
'type': 'access',
'jti': 'test-access-jti',
'exp': datetime.utcnow() + timedelta(minutes=15),
'iat': datetime.utcnow(),
}
access_token = jwt.encode(
access_payload, django_settings.SECRET_KEY, algorithm='HS256'
)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {access_token}")
response = self.client.get(self.url)
self.assertEqual(response.status_code, status.HTTP_200_OK)
@override_settings(
CACHES={'default': {'BACKEND': 'django.core.cache.backends.locmem.LocMemCache'}},
SESSION_ENGINE='django.contrib.sessions.backends.db',
)
class RefreshJWTErrorLeakTest(TestCase):
"""
RefreshJWTView ne doit pas retourner les messages d'exception internes.
Avant la correction, str(e) était renvoyé directement au client.
"""
def setUp(self):
self.client = APIClient()
self.url = reverse("Auth:refresh_jwt")
def test_token_invalide_ne_revele_pas_details_internes(self):
"""
Un token invalide doit retourner un message générique, pas les détails de l'exception.
"""
payload = {"refresh": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.forged.signature"}
response = self.client.post(
self.url, data=json.dumps(payload), content_type="application/json"
)
self.assertIn(response.status_code, [status.HTTP_400_BAD_REQUEST, status.HTTP_401_UNAUTHORIZED])
body = response.content.decode()
# Le message ne doit pas contenir de traceback ou de détails internes de bibliothèque
self.assertNotIn("Traceback", body)
self.assertNotIn("jwt.exceptions", body)
self.assertNotIn("simplejwt", body.lower())
def test_erreur_reponse_est_generique(self):
"""
Le message d'erreur doit être 'Token invalide' (générique), pas le str(e).
"""
payload = {"refresh": "bad.token.data"}
response = self.client.post(
self.url, data=json.dumps(payload), content_type="application/json"
)
data = response.json()
self.assertIn('errorMessage', data)
# Le message doit être le message générique, pas la chaîne brute de l'exception
self.assertIn(data['errorMessage'], ['Token invalide', 'Format de token invalide',
'Refresh token expiré', 'Erreur interne du serveur'])
@override_settings(
CACHES={'default': {'BACKEND': 'django.core.cache.backends.locmem.LocMemCache'}},
SESSION_ENGINE='django.contrib.sessions.backends.db',
)
class SecurityHeadersTest(TestCase):
"""
Les en-têtes de sécurité HTTP doivent être présents dans toutes les réponses.
"""
def setUp(self):
self.client = APIClient()
def test_x_content_type_options_present(self):
"""X-Content-Type-Options: nosniff doit être présent."""
response = self.client.get(reverse("Auth:csrf"))
self.assertEqual(
response.get('X-Content-Type-Options'), 'nosniff',
"X-Content-Type-Options: nosniff doit être défini (prévient le MIME sniffing)"
)
def test_referrer_policy_present(self):
"""Referrer-Policy doit être présent."""
response = self.client.get(reverse("Auth:csrf"))
self.assertIsNotNone(
response.get('Referrer-Policy'),
"Referrer-Policy doit être défini"
)
def test_csp_frame_ancestors_present(self):
"""Content-Security-Policy doit contenir frame-ancestors."""
response = self.client.get(reverse("Auth:csrf"))
csp = response.get('Content-Security-Policy', '')
self.assertIn('frame-ancestors', csp,
"CSP doit définir frame-ancestors (protection clickjacking)")
self.assertIn("object-src 'none'", csp,
"CSP doit définir object-src 'none' (prévient les plugins malveillants)")

View File

@ -17,10 +17,12 @@ from datetime import datetime, timedelta
import jwt
from jwt.exceptions import ExpiredSignatureError, InvalidTokenError
import json
import uuid
from . import validator
from .models import Profile, ProfileRole
from rest_framework.decorators import action, api_view
from rest_framework.decorators import action, api_view, permission_classes
from rest_framework.permissions import IsAuthenticated, AllowAny
from django.db.models import Q
from Auth.serializers import ProfileSerializer, ProfileRoleSerializer
@ -28,13 +30,28 @@ from Subscriptions.models import RegistrationForm, Guardian
import N3wtSchool.mailManager as mailer
import Subscriptions.util as util
import logging
from N3wtSchool import bdd, error, settings
from N3wtSchool import bdd, error
from rest_framework.throttling import AnonRateThrottle
from rest_framework_simplejwt.authentication import JWTAuthentication
logger = logging.getLogger("AuthViews")
class LoginRateThrottle(AnonRateThrottle):
"""Limite les tentatives de connexion à 10/min par IP.
Configurable via DEFAULT_THROTTLE_RATES['login'] dans settings.
"""
scope = 'login'
def get_rate(self):
try:
return super().get_rate()
except Exception:
# Fallback si le scope 'login' n'est pas configuré dans les settings
return '10/min'
@swagger_auto_schema(
method='get',
operation_description="Obtenir un token CSRF",
@ -43,11 +60,15 @@ logger = logging.getLogger("AuthViews")
}))}
)
@api_view(['GET'])
@permission_classes([AllowAny])
def csrf(request):
token = get_token(request)
return JsonResponse({'csrfToken': token})
class SessionView(APIView):
permission_classes = [AllowAny]
authentication_classes = [] # SessionView gère sa propre validation JWT
@swagger_auto_schema(
operation_description="Vérifier une session utilisateur",
manual_parameters=[openapi.Parameter('Authorization', openapi.IN_HEADER, type=openapi.TYPE_STRING, description='Bearer token')],
@ -70,6 +91,11 @@ class SessionView(APIView):
token = request.META.get('HTTP_AUTHORIZATION', '').split('Bearer ')[-1]
try:
decoded_token = jwt.decode(token, settings.SECRET_KEY, algorithms=['HS256'])
# Refuser les refresh tokens : seul le type 'access' est autorisé
# Accepter 'type' (format custom) ET 'token_type' (format SimpleJWT)
token_type_claim = decoded_token.get('type') or decoded_token.get('token_type')
if token_type_claim != 'access':
return JsonResponse({"error": "Invalid token"}, status=status.HTTP_401_UNAUTHORIZED)
userid = decoded_token.get('user_id')
user = Profile.objects.get(id=userid)
roles = ProfileRole.objects.filter(profile=user).values('role_type', 'establishment__name')
@ -88,6 +114,8 @@ class SessionView(APIView):
return JsonResponse({"error": "Invalid token"}, status=status.HTTP_401_UNAUTHORIZED)
class ProfileView(APIView):
permission_classes = [IsAuthenticated]
@swagger_auto_schema(
operation_description="Obtenir la liste des profils",
responses={200: ProfileSerializer(many=True)}
@ -118,6 +146,8 @@ class ProfileView(APIView):
@method_decorator(csrf_protect, name='dispatch')
@method_decorator(ensure_csrf_cookie, name='dispatch')
class ProfileSimpleView(APIView):
permission_classes = [IsAuthenticated]
@swagger_auto_schema(
operation_description="Obtenir un profil par son ID",
responses={200: ProfileSerializer}
@ -152,8 +182,12 @@ class ProfileSimpleView(APIView):
def delete(self, request, id):
return bdd.delete_object(Profile, id)
@method_decorator(csrf_exempt, name='dispatch')
@method_decorator(csrf_protect, name='dispatch')
@method_decorator(ensure_csrf_cookie, name='dispatch')
class LoginView(APIView):
permission_classes = [AllowAny]
throttle_classes = [LoginRateThrottle]
@swagger_auto_schema(
operation_description="Connexion utilisateur",
request_body=openapi.Schema(
@ -240,12 +274,14 @@ def makeToken(user):
})
# Générer le JWT avec la bonne syntaxe datetime
# jti (JWT ID) est obligatoire : SimpleJWT le vérifie via AccessToken.verify_token_id()
access_payload = {
'user_id': user.id,
'email': user.email,
'roleIndexLoginDefault': user.roleIndexLoginDefault,
'roles': roles,
'type': 'access',
'jti': str(uuid.uuid4()),
'exp': datetime.utcnow() + settings.SIMPLE_JWT['ACCESS_TOKEN_LIFETIME'],
'iat': datetime.utcnow(),
}
@ -255,16 +291,23 @@ def makeToken(user):
refresh_payload = {
'user_id': user.id,
'type': 'refresh',
'jti': str(uuid.uuid4()),
'exp': datetime.utcnow() + settings.SIMPLE_JWT['REFRESH_TOKEN_LIFETIME'],
'iat': datetime.utcnow(),
}
refresh_token = jwt.encode(refresh_payload, settings.SIMPLE_JWT['SIGNING_KEY'], algorithm=settings.SIMPLE_JWT['ALGORITHM'])
return access_token, refresh_token
except Exception as e:
logger.error(f"Erreur lors de la création du token: {str(e)}")
return None
logger.error(f"Erreur lors de la création du token: {str(e)}", exc_info=True)
# On lève l'exception pour que l'appelant (LoginView / RefreshJWTView)
# retourne une erreur HTTP 500 explicite plutôt que de crasher silencieusement
# sur le unpack d'un None.
raise
class RefreshJWTView(APIView):
permission_classes = [AllowAny]
throttle_classes = [LoginRateThrottle]
@swagger_auto_schema(
operation_description="Rafraîchir le token d'accès",
request_body=openapi.Schema(
@ -290,7 +333,6 @@ class RefreshJWTView(APIView):
))
}
)
@method_decorator(csrf_exempt, name='dispatch')
def post(self, request):
data = JSONParser().parse(request)
refresh_token = data.get("refresh")
@ -335,14 +377,16 @@ class RefreshJWTView(APIView):
return JsonResponse({'errorMessage': 'Refresh token expiré'}, status=400)
except InvalidTokenError as e:
logger.error(f"Token invalide: {str(e)}")
return JsonResponse({'errorMessage': f'Token invalide: {str(e)}'}, status=400)
return JsonResponse({'errorMessage': 'Token invalide'}, status=400)
except Exception as e:
logger.error(f"Erreur inattendue: {str(e)}")
return JsonResponse({'errorMessage': f'Erreur inattendue: {str(e)}'}, status=400)
logger.error(f"Erreur inattendue: {str(e)}", exc_info=True)
return JsonResponse({'errorMessage': 'Erreur interne du serveur'}, status=500)
@method_decorator(csrf_protect, name='dispatch')
@method_decorator(ensure_csrf_cookie, name='dispatch')
class SubscribeView(APIView):
permission_classes = [AllowAny]
@swagger_auto_schema(
operation_description="Inscription utilisateur",
manual_parameters=[
@ -430,6 +474,8 @@ class SubscribeView(APIView):
@method_decorator(csrf_protect, name='dispatch')
@method_decorator(ensure_csrf_cookie, name='dispatch')
class NewPasswordView(APIView):
permission_classes = [AllowAny]
@swagger_auto_schema(
operation_description="Demande de nouveau mot de passe",
request_body=openapi.Schema(
@ -479,6 +525,8 @@ class NewPasswordView(APIView):
@method_decorator(csrf_protect, name='dispatch')
@method_decorator(ensure_csrf_cookie, name='dispatch')
class ResetPasswordView(APIView):
permission_classes = [AllowAny]
@swagger_auto_schema(
operation_description="Réinitialisation du mot de passe",
request_body=openapi.Schema(
@ -525,7 +573,9 @@ class ResetPasswordView(APIView):
return JsonResponse({'message': retour, "errorMessage": retourErreur, "errorFields": errorFields}, safe=False)
class ProfileRoleView(APIView):
permission_classes = [IsAuthenticated]
pagination_class = CustomProfilesPagination
@swagger_auto_schema(
operation_description="Obtenir la liste des profile_roles",
responses={200: ProfileRoleSerializer(many=True)}
@ -596,6 +646,8 @@ class ProfileRoleView(APIView):
@method_decorator(csrf_protect, name='dispatch')
@method_decorator(ensure_csrf_cookie, name='dispatch')
class ProfileRoleSimpleView(APIView):
permission_classes = [IsAuthenticated]
@swagger_auto_schema(
operation_description="Obtenir un profile_role par son ID",
responses={200: ProfileRoleSerializer}