mirror of
https://git.v0id.ovh/n3wt-innov/n3wt-school.git
synced 2026-04-03 16:51:26 +00:00
275 lines
11 KiB
Python
275 lines
11 KiB
Python
"""
|
|
Tests de sécurité — GestionMessagerie
|
|
Vérifie :
|
|
- Protection IDOR : un utilisateur ne peut pas lire/écrire au nom d'un autre
|
|
- Authentification requise sur tous les endpoints
|
|
- L'expéditeur d'un message est toujours l'utilisateur authentifié
|
|
- Le mark-as-read utilise request.user (pas user_id du body)
|
|
- L'upload de fichier utilise request.user (pas sender_id du body)
|
|
"""
|
|
|
|
import json
|
|
import uuid
|
|
|
|
from django.test import TestCase, override_settings
|
|
from django.urls import reverse
|
|
from rest_framework import status
|
|
from rest_framework.test import APIClient
|
|
from rest_framework_simplejwt.tokens import RefreshToken
|
|
|
|
from Auth.models import Profile, ProfileRole
|
|
from Establishment.models import Establishment
|
|
from GestionMessagerie.models import (
|
|
Conversation, ConversationParticipant, Message
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def create_establishment(name="Ecole Sécurité"):
|
|
return Establishment.objects.create(
|
|
name=name,
|
|
address="1 rue des Tests",
|
|
total_capacity=50,
|
|
establishment_type=[1],
|
|
)
|
|
|
|
|
|
def create_user(email, password="TestPass!123"):
|
|
user = Profile.objects.create_user(
|
|
username=email,
|
|
email=email,
|
|
password=password,
|
|
)
|
|
return user
|
|
|
|
|
|
def create_active_user(email, password="TestPass!123"):
|
|
user = create_user(email, password)
|
|
establishment = create_establishment(name=f"Ecole de {email}")
|
|
ProfileRole.objects.create(
|
|
profile=user,
|
|
role_type=ProfileRole.RoleType.PROFIL_ECOLE,
|
|
establishment=establishment,
|
|
is_active=True,
|
|
)
|
|
return user
|
|
|
|
|
|
def get_token(user):
|
|
return str(RefreshToken.for_user(user).access_token)
|
|
|
|
|
|
def create_conversation_with_participant(user1, user2):
|
|
"""Crée une conversation privée entre deux utilisateurs."""
|
|
conv = Conversation.objects.create(conversation_type='private')
|
|
ConversationParticipant.objects.create(
|
|
conversation=conv, participant=user1, is_active=True
|
|
)
|
|
ConversationParticipant.objects.create(
|
|
conversation=conv, participant=user2, is_active=True
|
|
)
|
|
return conv
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Configuration commune
|
|
# ---------------------------------------------------------------------------
|
|
|
|
OVERRIDE = dict(
|
|
CACHES={'default': {'BACKEND': 'django.core.cache.backends.locmem.LocMemCache'}},
|
|
SESSION_ENGINE='django.contrib.sessions.backends.db',
|
|
CHANNEL_LAYERS={'default': {'BACKEND': 'channels.layers.InMemoryChannelLayer'}},
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests : authentification requise
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@override_settings(**OVERRIDE)
|
|
class MessagerieAuthRequiredTest(TestCase):
|
|
"""Tous les endpoints de messagerie doivent rejeter les requêtes non authentifiées."""
|
|
|
|
def setUp(self):
|
|
self.client = APIClient()
|
|
|
|
def test_conversations_sans_auth_retourne_401(self):
|
|
response = self.client.get(reverse('GestionMessagerie:conversations'))
|
|
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
|
|
|
|
def test_send_message_sans_auth_retourne_401(self):
|
|
response = self.client.post(
|
|
reverse('GestionMessagerie:send_message'),
|
|
data=json.dumps({'conversation_id': str(uuid.uuid4()), 'content': 'Hello'}),
|
|
content_type='application/json',
|
|
)
|
|
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
|
|
|
|
def test_mark_as_read_sans_auth_retourne_401(self):
|
|
response = self.client.post(
|
|
reverse('GestionMessagerie:mark_as_read'),
|
|
data=json.dumps({'conversation_id': str(uuid.uuid4())}),
|
|
content_type='application/json',
|
|
)
|
|
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
|
|
|
|
def test_upload_file_sans_auth_retourne_401(self):
|
|
response = self.client.post(reverse('GestionMessagerie:upload_file'))
|
|
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests IDOR : liste des conversations (request.user ignorant l'URL user_id)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@override_settings(**OVERRIDE)
|
|
class ConversationListIDORTest(TestCase):
|
|
"""
|
|
GET conversations/user/<user_id>/ doit retourner les conversations de
|
|
request.user, pas celles de l'utilisateur dont l'ID est dans l'URL.
|
|
"""
|
|
|
|
def setUp(self):
|
|
self.client = APIClient()
|
|
self.alice = create_active_user('alice@test.com')
|
|
self.bob = create_active_user('bob@test.com')
|
|
self.carol = create_active_user('carol@test.com')
|
|
|
|
# Conversation entre Alice et Bob (Carol ne doit pas la voir)
|
|
self.conv_alice_bob = create_conversation_with_participant(self.alice, self.bob)
|
|
|
|
def test_carol_ne_voit_pas_les_conversations_de_alice(self):
|
|
"""
|
|
Carol s'authentifie mais passe alice.id dans l'URL.
|
|
Elle doit voir ses propres conversations (vides), pas celles d'Alice.
|
|
"""
|
|
token = get_token(self.carol)
|
|
self.client.credentials(HTTP_AUTHORIZATION=f'Bearer {token}')
|
|
url = reverse('GestionMessagerie:conversations_by_user', kwargs={'user_id': self.alice.id})
|
|
response = self.client.get(url)
|
|
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
|
data = response.json()
|
|
# Carol n'a aucune conversation : la liste doit être vide
|
|
self.assertEqual(len(data), 0, "Carol ne doit pas voir les conversations d'Alice (IDOR)")
|
|
|
|
def test_alice_voit_ses_propres_conversations(self):
|
|
"""Alice voit bien sa conversation avec Bob."""
|
|
token = get_token(self.alice)
|
|
self.client.credentials(HTTP_AUTHORIZATION=f'Bearer {token}')
|
|
url = reverse('GestionMessagerie:conversations_by_user', kwargs={'user_id': self.alice.id})
|
|
response = self.client.get(url)
|
|
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
|
data = response.json()
|
|
self.assertEqual(len(data), 1)
|
|
self.assertEqual(data[0]['id'], str(self.conv_alice_bob.id))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests IDOR : envoi de message (sender = request.user)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@override_settings(**OVERRIDE)
|
|
class SendMessageIDORTest(TestCase):
|
|
"""
|
|
POST send-message/ doit utiliser request.user comme expéditeur,
|
|
indépendamment du sender_id fourni dans le body.
|
|
"""
|
|
|
|
def setUp(self):
|
|
self.client = APIClient()
|
|
self.alice = create_active_user('alice_msg@test.com')
|
|
self.bob = create_active_user('bob_msg@test.com')
|
|
self.conv = create_conversation_with_participant(self.alice, self.bob)
|
|
|
|
def test_sender_id_dans_body_est_ignore(self):
|
|
"""
|
|
Bob envoie un message en mettant alice.id comme sender_id dans le body.
|
|
Le message doit avoir bob comme expéditeur.
|
|
"""
|
|
token = get_token(self.bob)
|
|
self.client.credentials(HTTP_AUTHORIZATION=f'Bearer {token}')
|
|
payload = {
|
|
'conversation_id': str(self.conv.id),
|
|
'sender_id': self.alice.id, # tentative d'impersonation
|
|
'content': 'Message imposteur',
|
|
}
|
|
response = self.client.post(
|
|
reverse('GestionMessagerie:send_message'),
|
|
data=json.dumps(payload),
|
|
content_type='application/json',
|
|
)
|
|
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
|
|
# Vérifier que l'expéditeur est bien Bob, pas Alice
|
|
message = Message.objects.get(conversation=self.conv, content='Message imposteur')
|
|
self.assertEqual(message.sender.id, self.bob.id,
|
|
"L'expéditeur doit être request.user (Bob), pas le sender_id du body (Alice)")
|
|
|
|
def test_non_participant_ne_peut_pas_envoyer(self):
|
|
"""
|
|
Carol (non participante) ne peut pas envoyer dans la conv Alice-Bob.
|
|
"""
|
|
carol = create_active_user('carol_msg@test.com')
|
|
token = get_token(carol)
|
|
self.client.credentials(HTTP_AUTHORIZATION=f'Bearer {token}')
|
|
payload = {
|
|
'conversation_id': str(self.conv.id),
|
|
'content': 'Message intrus',
|
|
}
|
|
response = self.client.post(
|
|
reverse('GestionMessagerie:send_message'),
|
|
data=json.dumps(payload),
|
|
content_type='application/json',
|
|
)
|
|
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests IDOR : mark-as-read (request.user, pas user_id du body)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@override_settings(**OVERRIDE)
|
|
class MarkAsReadIDORTest(TestCase):
|
|
"""
|
|
POST mark-as-read doit utiliser request.user, pas user_id du body.
|
|
Carol ne peut pas marquer comme lue une conversation d'Alice.
|
|
"""
|
|
|
|
def setUp(self):
|
|
self.client = APIClient()
|
|
self.alice = create_active_user('alice_read@test.com')
|
|
self.bob = create_active_user('bob_read@test.com')
|
|
self.carol = create_active_user('carol_read@test.com')
|
|
self.conv = create_conversation_with_participant(self.alice, self.bob)
|
|
|
|
def test_carol_ne_peut_pas_marquer_conversation_alice_comme_lue(self):
|
|
"""
|
|
Carol passe alice.id dans le body mais n'est pas participante.
|
|
Elle doit recevoir 404 (pas de ConversationParticipant trouvé pour Carol).
|
|
"""
|
|
token = get_token(self.carol)
|
|
self.client.credentials(HTTP_AUTHORIZATION=f'Bearer {token}')
|
|
payload = {'user_id': self.alice.id} # tentative IDOR
|
|
url = reverse('GestionMessagerie:mark_as_read') + f'?conversation_id={self.conv.id}'
|
|
response = self.client.post(
|
|
reverse('GestionMessagerie:mark_as_read'),
|
|
data=json.dumps(payload),
|
|
content_type='application/json',
|
|
)
|
|
# Doit échouer car on cherche un participant pour request.user (Carol), qui n'est pas là
|
|
self.assertIn(response.status_code, [status.HTTP_404_NOT_FOUND, status.HTTP_400_BAD_REQUEST])
|
|
|
|
def test_alice_peut_marquer_sa_propre_conversation(self):
|
|
"""Alice peut marquer sa conversation comme lue."""
|
|
token = get_token(self.alice)
|
|
self.client.credentials(HTTP_AUTHORIZATION=f'Bearer {token}')
|
|
response = self.client.post(
|
|
reverse('GestionMessagerie:mark_as_read'),
|
|
data=json.dumps({}),
|
|
content_type='application/json',
|
|
)
|
|
# Sans conversation_id : 404 attendu, mais pas 403 (accès autorisé à la vue)
|
|
self.assertNotIn(response.status_code, [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN])
|