Files
n3wt-school/Back-End/start.py
2025-11-30 17:24:25 +01:00

114 lines
4.0 KiB
Python

import subprocess
import os
from watchfiles import run_process
def run_command(command):
process = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate(input=b"y\n")
if process.returncode != 0:
print(f"Error running command: {' '.join(command)}")
print(f"stdout: {stdout.decode()}")
print(f"stderr: {stderr.decode()}")
return process.returncode
test_mode = os.getenv('test_mode', 'false').lower() == 'true'
flush_data = os.getenv('flush_data', 'false').lower() == 'true'
migrate_data = os.getenv('migrate_data', 'false').lower() == 'true'
watch_mode = os.getenv('DJANGO_WATCH', 'false').lower() == 'true'
collect_static_cmd = [
["python", "manage.py", "collectstatic", "--noinput"]
]
flush_data_cmd = [
["python", "manage.py", "flush", "--noinput"]
]
migrate_commands = [
["python", "manage.py", "makemigrations", "Common", "--noinput"],
["python", "manage.py", "makemigrations", "Establishment", "--noinput"],
["python", "manage.py", "makemigrations", "Settings", "--noinput"],
["python", "manage.py", "makemigrations", "Subscriptions", "--noinput"],
["python", "manage.py", "makemigrations", "Planning", "--noinput"],
["python", "manage.py", "makemigrations", "GestionNotification", "--noinput"],
["python", "manage.py", "makemigrations", "GestionEmail", "--noinput"],
["python", "manage.py", "makemigrations", "GestionMessagerie", "--noinput"],
["python", "manage.py", "makemigrations", "Auth", "--noinput"],
["python", "manage.py", "makemigrations", "School", "--noinput"]
]
commands = [
["python", "manage.py", "migrate", "--noinput"]
]
test_commands = [
["python", "manage.py", "init_mock_datas"]
]
def run_daphne():
try:
result = subprocess.run([
"daphne", "-b", "0.0.0.0", "-p", "8080", "N3wtSchool.asgi:application"
])
return result.returncode
except KeyboardInterrupt:
print("Arrêt de Daphne (KeyboardInterrupt)")
return 0
if __name__ == "__main__":
for command in collect_static_cmd:
if run_command(command) != 0:
exit(1)
if flush_data:
for command in flush_data_cmd:
if run_command(command) != 0:
exit(1)
if migrate_data:
for command in migrate_commands:
if run_command(command) != 0:
exit(1)
for command in commands:
if run_command(command) != 0:
exit(1)
if test_mode:
for test_command in test_commands:
if run_command(test_command) != 0:
exit(1)
if watch_mode:
celery_worker = subprocess.Popen(["celery", "-A", "N3wtSchool", "worker", "--loglevel=info"])
celery_beat = subprocess.Popen(["celery", "-A", "N3wtSchool", "beat", "--loglevel=info", "--scheduler", "django_celery_beat.schedulers:DatabaseScheduler"])
try:
run_process(
'.',
target=run_daphne
)
except KeyboardInterrupt:
print("Arrêt demandé (KeyboardInterrupt)")
finally:
celery_worker.terminate()
celery_beat.terminate()
celery_worker.wait()
celery_beat.wait()
else:
processes = [
subprocess.Popen([
"daphne", "-b", "0.0.0.0", "-p", "8080", "N3wtSchool.asgi:application"
]),
subprocess.Popen(["celery", "-A", "N3wtSchool", "worker", "--loglevel=info"]),
subprocess.Popen(["celery", "-A", "N3wtSchool", "beat", "--loglevel=info", "--scheduler", "django_celery_beat.schedulers:DatabaseScheduler"])
]
try:
for process in processes:
process.wait()
except KeyboardInterrupt:
print("Arrêt demandé (KeyboardInterrupt)")
for process in processes:
process.terminate()
for process in processes:
process.wait()