import json import logging from uuid import UUID from decimal import Decimal from datetime import datetime from channels.generic.websocket import AsyncWebsocketConsumer from channels.db import database_sync_to_async from django.utils import timezone from .models import Conversation, ConversationParticipant, Message, UserPresence, MessageRead from .serializers import MessageSerializer, ConversationSerializer from Auth.models import Profile logger = logging.getLogger(__name__) def serialize_for_websocket(data): """ Convertit récursivement les objets non-sérialisables en JSON en types sérialisables """ if isinstance(data, dict): return {key: serialize_for_websocket(value) for key, value in data.items()} elif isinstance(data, list): return [serialize_for_websocket(item) for item in data] elif isinstance(data, UUID): return str(data) elif isinstance(data, Decimal): return float(data) elif isinstance(data, datetime): return data.isoformat() else: return data class ChatConsumer(AsyncWebsocketConsumer): """Consumer WebSocket pour la messagerie instantanée""" async def connect(self): self.user_id = self.scope['url_route']['kwargs']['user_id'] self.user_group_name = f'user_{self.user_id}' # Vérifier si l'utilisateur est authentifié user = self.scope.get('user') if not user or user.is_anonymous: logger.warning(f"Tentative de connexion WebSocket non authentifiée pour user_id: {self.user_id}") await self.close() return # Vérifier que l'utilisateur connecté correspond à l'user_id de l'URL if str(user.id) != str(self.user_id): logger.warning(f"Tentative d'accès WebSocket avec user_id incorrect: {self.user_id} vs {user.id}") await self.close() return self.user = user # Rejoindre le groupe utilisateur await self.channel_layer.group_add( self.user_group_name, self.channel_name ) # Rejoindre les groupes des conversations de l'utilisateur conversations = await self.get_user_conversations(self.user_id) for conversation in conversations: await self.channel_layer.group_add( f'conversation_{conversation.id}', self.channel_name ) # Mettre à jour le statut de présence presence = await self.update_user_presence(self.user_id, 'online') # Notifier les autres utilisateurs du changement de statut if presence: await self.broadcast_presence_update(self.user_id, 'online') # Envoyer les statuts de présence existants des autres utilisateurs connectés await self.send_existing_user_presences() await self.accept() logger.info(f"User {self.user_id} connected to chat") async def send_existing_user_presences(self): """Envoyer les statuts de présence existants des autres utilisateurs connectés""" try: # Obtenir toutes les conversations de cet utilisateur conversations = await self.get_user_conversations(self.user_id) # Créer un set pour éviter les doublons d'utilisateurs other_users = set() # Pour chaque conversation, récupérer les participants for conversation in conversations: participants = await self.get_conversation_participants(conversation.id) for participant in participants: if participant.id != self.user_id: other_users.add(participant.id) # Envoyer le statut de présence pour chaque utilisateur for user_id in other_users: presence = await self.get_user_presence(user_id) if presence: await self.send(text_data=json.dumps({ 'type': 'presence_update', 'user_id': str(user_id), 'status': presence.status })) except Exception as e: logger.error(f"Error sending existing user presences: {str(e)}") async def disconnect(self, close_code): # Quitter tous les groupes await self.channel_layer.group_discard( self.user_group_name, self.channel_name ) if hasattr(self, 'user'): conversations = await self.get_user_conversations(self.user_id) for conversation in conversations: await self.channel_layer.group_discard( f'conversation_{conversation.id}', self.channel_name ) # Mettre à jour le statut de présence presence = await self.update_user_presence(self.user_id, 'offline') # Notifier les autres utilisateurs du changement de statut if presence: await self.broadcast_presence_update(self.user_id, 'offline') logger.info(f"User {self.user_id} disconnected from chat") async def receive(self, text_data): """Recevoir et traiter les messages du client""" try: text_data_json = json.loads(text_data) message_type = text_data_json.get('type') if message_type == 'send_message': await self.handle_send_message(text_data_json) elif message_type == 'typing_start': await self.handle_typing_start(text_data_json) elif message_type == 'typing_stop': await self.handle_typing_stop(text_data_json) elif message_type == 'mark_as_read': await self.handle_mark_as_read(text_data_json) elif message_type == 'join_conversation': await self.handle_join_conversation(text_data_json) elif message_type == 'leave_conversation': await self.handle_leave_conversation(text_data_json) elif message_type == 'presence_update': await self.handle_presence_update(text_data_json) else: logger.warning(f"Unknown message type: {message_type}") await self.send(text_data=json.dumps({ 'type': 'error', 'message': f'Unknown message type: {message_type}' })) except json.JSONDecodeError: await self.send(text_data=json.dumps({ 'type': 'error', 'message': 'Invalid JSON format' })) except Exception as e: logger.error(f"Error in receive: {str(e)}") await self.send(text_data=json.dumps({ 'type': 'error', 'message': 'Internal server error' })) async def handle_send_message(self, data): """Gérer l'envoi d'un nouveau message""" conversation_id = data.get('conversation_id') content = data.get('content', '').strip() message_type = data.get('message_type', 'text') attachment = data.get('attachment') # Vérifier qu'on a soit du contenu, soit un fichier if not conversation_id or (not content and not attachment): await self.send(text_data=json.dumps({ 'type': 'error', 'message': 'Conversation ID and content or attachment are required' })) return # Vérifier que l'utilisateur peut envoyer dans cette conversation can_send = await self.can_user_send_message(self.user_id, conversation_id) if not can_send: await self.send(text_data=json.dumps({ 'type': 'error', 'message': 'You cannot send messages to this conversation' })) return # Créer le message avec ou sans fichier message = await self.create_message(conversation_id, self.user_id, content, message_type, attachment) if not message: await self.send(text_data=json.dumps({ 'type': 'error', 'message': 'Failed to create message' })) return # Sérialiser le message message_data = await self.serialize_message(message) # Auto-marquer comme lu pour les utilisateurs connectés (présents dans la conversation) await self.auto_mark_read_for_online_users(message, conversation_id) # Envoyer le message à tous les participants de la conversation await self.channel_layer.group_send( f'conversation_{conversation_id}', { 'type': 'chat_message', 'message': message_data } ) async def handle_typing_start(self, data): """Gérer le début de frappe""" conversation_id = data.get('conversation_id') if conversation_id: await self.update_typing_status(self.user_id, conversation_id, True) # Récupérer le nom de l'utilisateur user_name = await self.get_user_display_name(self.user_id) await self.channel_layer.group_send( f'conversation_{conversation_id}', { 'type': 'typing_status', 'user_id': str(self.user_id), 'user_name': user_name, 'is_typing': True, 'conversation_id': str(conversation_id) } ) async def handle_typing_stop(self, data): """Gérer l'arrêt de frappe""" conversation_id = data.get('conversation_id') if conversation_id: await self.update_typing_status(self.user_id, conversation_id, False) # Récupérer le nom de l'utilisateur user_name = await self.get_user_display_name(self.user_id) await self.channel_layer.group_send( f'conversation_{conversation_id}', { 'type': 'typing_status', 'user_id': str(self.user_id), 'user_name': user_name, 'is_typing': False, 'conversation_id': str(conversation_id) } ) async def handle_mark_as_read(self, data): """Marquer les messages comme lus""" conversation_id = data.get('conversation_id') if conversation_id: await self.mark_conversation_as_read(self.user_id, conversation_id) await self.channel_layer.group_send( f'conversation_{conversation_id}', { 'type': 'messages_read', 'user_id': str(self.user_id), 'conversation_id': str(conversation_id) } ) async def handle_join_conversation(self, data): """Rejoindre une conversation""" conversation_id = data.get('conversation_id') if conversation_id: await self.channel_layer.group_add( f'conversation_{conversation_id}', self.channel_name ) async def handle_leave_conversation(self, data): """Quitter une conversation""" conversation_id = data.get('conversation_id') if conversation_id: await self.channel_layer.group_discard( f'conversation_{conversation_id}', self.channel_name ) async def handle_presence_update(self, data): """Gérer les mises à jour de présence""" status = data.get('status', 'online') if status in ['online', 'offline', 'away']: await self.update_user_presence(self.user_id, status) await self.broadcast_presence_update(self.user_id, status) # Méthodes pour recevoir les messages des groupes async def chat_message(self, event): """Envoyer un message de chat au WebSocket""" message_data = serialize_for_websocket(event['message']) await self.send(text_data=json.dumps({ 'type': 'new_message', 'message': message_data })) async def typing_status(self, event): """Envoyer le statut de frappe""" # Ne pas envoyer à l'expéditeur if str(event['user_id']) != str(self.user_id): await self.send(text_data=json.dumps({ 'type': 'typing_status', 'user_id': str(event['user_id']), 'user_name': event.get('user_name', ''), 'is_typing': event['is_typing'], 'conversation_id': str(event['conversation_id']) })) async def messages_read(self, event): """Notifier que des messages ont été lus""" if str(event['user_id']) != str(self.user_id): await self.send(text_data=json.dumps({ 'type': 'messages_read', 'user_id': str(event['user_id']), 'conversation_id': str(event['conversation_id']) })) async def user_presence_update(self, event): """Notifier d'un changement de présence""" await self.send(text_data=json.dumps({ 'type': 'presence_update', 'user_id': str(event['user_id']), 'status': event['status'] })) async def new_conversation_notification(self, event): """Notifier d'une nouvelle conversation""" conversation = serialize_for_websocket(event['conversation']) conversation_id = conversation['id'] # Rejoindre automatiquement le groupe de la nouvelle conversation await self.channel_layer.group_add( f'conversation_{conversation_id}', self.channel_name ) # Envoyer la notification au client await self.send(text_data=json.dumps({ 'type': 'new_conversation', 'conversation': conversation })) # Diffuser les présences des participants de cette nouvelle conversation try: participants = await self.get_conversation_participants(conversation_id) for participant in participants: # Ne pas diffuser sa propre présence à soi-même if participant.id != self.user_id: presence = await self.get_user_presence(participant.id) if presence: await self.send(text_data=json.dumps({ 'type': 'presence_update', 'user_id': str(participant.id), 'status': presence.status })) except Exception as e: logger.error(f"Error sending presence updates for new conversation: {str(e)}") async def broadcast_presence_update(self, user_id, status): """Diffuser un changement de statut de présence à tous les utilisateurs connectés""" try: # Obtenir tous les utilisateurs qui ont des conversations avec cet utilisateur user_conversations = await self.get_user_conversations(user_id) # Créer un set pour éviter les doublons d'utilisateurs notified_users = set() # Pour chaque conversation, notifier tous les participants for conversation in user_conversations: participants = await self.get_conversation_participants(conversation.id) for participant in participants: if participant.id != user_id and participant.id not in notified_users: notified_users.add(participant.id) # Envoyer la notification au groupe utilisateur await self.channel_layer.group_send( f'user_{participant.id}', { 'type': 'user_presence_update', 'user_id': user_id, 'status': status } ) logger.info(f"Broadcasted presence update for user {user_id} ({status}) to {len(notified_users)} users") except Exception as e: logger.error(f"Error broadcasting presence update: {str(e)}") # Méthodes d'accès aux données (database_sync_to_async) @database_sync_to_async def get_user(self, user_id): try: return Profile.objects.get(id=user_id) except Profile.DoesNotExist: return None @database_sync_to_async def get_user_display_name(self, user_id): """Obtenir le nom d'affichage d'un utilisateur""" try: user = Profile.objects.get(id=user_id) if user.first_name and user.last_name: return f"{user.first_name} {user.last_name}" elif user.first_name: return user.first_name elif user.last_name: return user.last_name else: return user.email or f"Utilisateur {user_id}" except Profile.DoesNotExist: return f"Utilisateur {user_id}" @database_sync_to_async def get_user_conversations(self, user_id): return list(Conversation.objects.filter( participants__participant_id=user_id, participants__is_active=True, is_active=True ).distinct()) @database_sync_to_async def get_conversation_participants(self, conversation_id): """Obtenir tous les participants d'une conversation""" return list(Profile.objects.filter( conversation_participants__conversation_id=conversation_id, conversation_participants__is_active=True )) @database_sync_to_async def get_conversations_data(self, user_id): try: user = Profile.objects.get(id=user_id) conversations = Conversation.objects.filter( participants__participant=user, participants__is_active=True, is_active=True ).distinct() serializer = ConversationSerializer(conversations, many=True, context={'user': user}) return serializer.data except Exception as e: logger.error(f"Error getting conversations data: {str(e)}") return [] @database_sync_to_async def can_user_send_message(self, user_id, conversation_id): return ConversationParticipant.objects.filter( conversation_id=conversation_id, participant_id=user_id, is_active=True ).exists() @database_sync_to_async def create_message(self, conversation_id, sender_id, content, message_type, attachment=None): try: conversation = Conversation.objects.get(id=conversation_id) sender = Profile.objects.get(id=sender_id) message_data = { 'conversation': conversation, 'sender': sender, 'content': content, 'message_type': message_type } # Ajouter les informations du fichier si présent if attachment: message_data.update({ 'file_url': attachment.get('fileUrl'), 'file_name': attachment.get('fileName'), 'file_size': attachment.get('fileSize'), 'file_type': attachment.get('fileType'), }) # Si c'est un fichier, s'assurer que le type de message est correct if attachment.get('fileType', '').startswith('image/'): message_data['message_type'] = 'image' else: message_data['message_type'] = 'file' message = Message.objects.create(**message_data) # Mettre à jour l'activité de la conversation conversation.last_activity = message.created_at conversation.save(update_fields=['last_activity']) return message except Exception as e: logger.error(f"Error creating message: {str(e)}") return None @database_sync_to_async def serialize_message(self, message): serializer = MessageSerializer(message) return serialize_for_websocket(serializer.data) @database_sync_to_async def get_user_presence(self, user_id): """Récupérer la présence d'un utilisateur""" try: return UserPresence.objects.get(user_id=user_id) except UserPresence.DoesNotExist: return None @database_sync_to_async def update_user_presence(self, user_id, status): try: user = Profile.objects.get(id=user_id) presence, created = UserPresence.objects.get_or_create(user=user) old_status = presence.status presence.status = status presence.save() # Si le statut a changé, notifier les autres utilisateurs if old_status != status or created: logger.info(f"User {user_id} presence changed from {old_status} to {status}") return presence except Exception as e: logger.error(f"Error updating user presence: {str(e)}") return None @database_sync_to_async def update_typing_status(self, user_id, conversation_id, is_typing): try: user = Profile.objects.get(id=user_id) presence, created = UserPresence.objects.get_or_create(user=user) if is_typing: conversation = Conversation.objects.get(id=conversation_id) presence.is_typing_in = conversation else: presence.is_typing_in = None presence.save() except Exception as e: logger.error(f"Error updating typing status: {str(e)}") @database_sync_to_async def mark_conversation_as_read(self, user_id, conversation_id): """Marquer tous les messages non lus d'une conversation comme lus""" try: # Mettre à jour le last_read_at du participant participant = ConversationParticipant.objects.get( conversation_id=conversation_id, participant_id=user_id ) current_time = timezone.now() participant.last_read_at = current_time participant.save(update_fields=['last_read_at']) # Créer des enregistrements MessageRead pour tous les messages non lus # que l'utilisateur n'a pas encore explicitement lus unread_messages = Message.objects.filter( conversation_id=conversation_id, created_at__lte=current_time, is_deleted=False ).exclude( sender_id=user_id # Exclure ses propres messages ).exclude( read_by__participant_id=user_id # Exclure les messages déjà marqués comme lus ) # Créer les enregistrements MessageRead en batch message_reads = [ MessageRead(message=message, participant_id=user_id, read_at=current_time) for message in unread_messages ] if message_reads: MessageRead.objects.bulk_create(message_reads, ignore_conflicts=True) logger.info(f"Marked {len(message_reads)} messages as read for user {user_id} in conversation {conversation_id}") except Exception as e: logger.error(f"Error marking conversation as read: {str(e)}") @database_sync_to_async def auto_mark_read_for_online_users(self, message, conversation_id): """Auto-marquer comme lu pour les utilisateurs en ligne dans la conversation""" try: # Obtenir tous les participants de la conversation (synchrone) participants = ConversationParticipant.objects.filter( conversation_id=conversation_id, is_active=True ).exclude(participant_id=message.sender.id) # Obtenir l'heure de création du message message_time = message.created_at # Préparer les enregistrements MessageRead à créer message_reads = [] for participant_obj in participants: participant = participant_obj.participant # Vérifier si l'utilisateur est en ligne (synchrone) try: presence = UserPresence.objects.filter(user=participant).first() if presence and presence.status == 'online': # Vérifier qu'il n'existe pas déjà un enregistrement MessageRead if not MessageRead.objects.filter(message=message, participant=participant).exists(): message_reads.append(MessageRead( message=message, participant=participant, read_at=message_time )) except: # En cas d'erreur de présence, ne pas marquer comme lu continue # Créer les enregistrements MessageRead en batch if message_reads: MessageRead.objects.bulk_create(message_reads, ignore_conflicts=True) logger.info(f"Auto-marked {len(message_reads)} messages as read for online users in conversation {conversation_id}") except Exception as e: logger.error(f"Error in auto_mark_read_for_online_users: {str(e)}")