fix: Mise en place de l'auto reload pour Daphne [#65]

This commit is contained in:
N3WT DE COMPET
2025-06-05 19:55:35 +02:00
parent 0064b8d35a
commit 7f002e2e6a
2 changed files with 52 additions and 17 deletions

View File

@ -66,6 +66,7 @@ urllib3==2.2.3
vine==5.1.0
wcwidth==0.2.13
webencodings==0.5.1
watchfiles
xhtml2pdf==0.2.16
channels==4.0.0
channels-redis==4.1.0

View File

@ -1,5 +1,6 @@
import subprocess
import os
from watchfiles import run_process
def run_command(command):
process = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
@ -11,6 +12,7 @@ def run_command(command):
return process.returncode
test_mode = os.getenv('test_mode', 'false').lower() == 'true'
watch_mode = os.getenv('DJANGO_WATCH', 'false').lower() == 'true'
commands = [
["python", "manage.py", "collectstatic", "--noinput"],
@ -32,23 +34,55 @@ test_commands = [
["python", "manage.py", "init_mock_datas"]
]
for command in commands:
def run_daphne():
try:
result = subprocess.run([
"daphne", "-b", "0.0.0.0", "-p", "8080", "N3wtSchool.asgi:application"
])
return result.returncode
except KeyboardInterrupt:
print("Arrêt de Daphne (KeyboardInterrupt)")
return 0
if __name__ == "__main__":
for command in commands:
if run_command(command) != 0:
exit(1)
#if test_mode:
# for test_command in test_commands:
# if run_command(test_command) != 0:
# exit(1)
#if test_mode:
# for test_command in test_commands:
# if run_command(test_command) != 0:
# exit(1)
# Lancer les processus en parallèle
processes = [
subprocess.Popen(["daphne", "-b", "0.0.0.0", "-p", "8080", "N3wtSchool.asgi:application"]),
if watch_mode:
celery_worker = subprocess.Popen(["celery", "-A", "N3wtSchool", "worker", "--loglevel=info"])
celery_beat = subprocess.Popen(["celery", "-A", "N3wtSchool", "beat", "--loglevel=info", "--scheduler", "django_celery_beat.schedulers:DatabaseScheduler"])
try:
run_process(
'.',
target=run_daphne
)
except KeyboardInterrupt:
print("Arrêt demandé (KeyboardInterrupt)")
finally:
celery_worker.terminate()
celery_beat.terminate()
celery_worker.wait()
celery_beat.wait()
else:
processes = [
subprocess.Popen([
"daphne", "-b", "0.0.0.0", "-p", "8080", "N3wtSchool.asgi:application"
]),
subprocess.Popen(["celery", "-A", "N3wtSchool", "worker", "--loglevel=info"]),
subprocess.Popen(["celery", "-A", "N3wtSchool", "beat", "--loglevel=info", "--scheduler", "django_celery_beat.schedulers:DatabaseScheduler"])
]
# Attendre la fin des processus
for process in processes:
]
try:
for process in processes:
process.wait()
except KeyboardInterrupt:
print("Arrêt demandé (KeyboardInterrupt)")
for process in processes:
process.terminate()
for process in processes:
process.wait()