Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
File size: 6,393 Bytes
970eef1 ffa4ae8 970eef1 7e389db 970eef1 7e389db 970eef1 7e389db 970eef1 7e389db 970eef1 7e389db 970eef1 7e389db 970eef1 7e389db 970eef1 7e389db 970eef1 7e389db 970eef1 7e389db 970eef1 7e389db 970eef1 7e389db 970eef1 7e389db 970eef1 7e389db 970eef1 7e389db |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
from fastapi import APIRouter, HTTPException
from typing import Dict, Any
import os
import time
from tasks.create_bench_config_file import CreateBenchConfigTask
from tasks.create_bench import CreateBenchTask
router = APIRouter(tags=["benchmark"])
# Store active tasks by session_id (importé dans main.py)
active_tasks = {}
# Référence aux session_files (sera fournie par main.py)
# Cette déclaration sera écrasée par l'affectation dans __init__.py
session_files = {}
@router.post("/generate-benchmark")
async def generate_benchmark(data: Dict[str, Any]):
"""
Generate a benchmark configuration and run the ingestion process
Args:
data: Dictionary containing session_id
Returns:
Dictionary with logs and status
"""
session_id = data.get("session_id")
# Débogage pour vérifier les session_files et le session_id reçu
print(f"DEBUG: Session ID reçu: {session_id}")
print(f"DEBUG: Session files disponibles: {list(router.session_files.keys())}")
if not session_id or session_id not in router.session_files:
return {"error": "Invalid or missing session ID"}
file_path = router.session_files[session_id]
all_logs = []
try:
# Initialiser la tâche qui gérera tout le processus
task = UnifiedBenchmarkTask(session_uid=session_id)
# Stockage pour récupération ultérieure des logs
active_tasks[session_id] = task
# Démarrer le processus de benchmark
task.run(file_path)
# Récupérer les logs initiaux
all_logs = task.get_logs()
return {
"status": "running",
"logs": all_logs
}
except Exception as e:
return {
"status": "error",
"error": str(e),
"logs": all_logs
}
@router.get("/benchmark-progress/{session_id}")
async def get_benchmark_progress(session_id: str):
"""
Get the logs and status for a running benchmark task
Args:
session_id: Session ID for the task
Returns:
Dictionary with logs and completion status
"""
if session_id not in active_tasks:
raise HTTPException(status_code=404, detail="Benchmark task not found")
task = active_tasks[session_id]
logs = task.get_logs()
is_completed = task.is_task_completed()
return {
"logs": logs,
"is_completed": is_completed
}
# Créer une classe qui unifie le processus de benchmark
class UnifiedBenchmarkTask:
"""
Task that handles the entire benchmark process from configuration to completion
"""
def __init__(self, session_uid: str):
"""
Initialize the unified benchmark task
Args:
session_uid: Session ID for this task
"""
self.session_uid = session_uid
self.logs = []
self.is_completed = False
self.config_task = None
self.bench_task = None
self._add_log("[INFO] Initializing benchmark task")
def _add_log(self, message: str):
"""
Add a log message
Args:
message: Log message to add
"""
if message not in self.logs: # Éviter les doublons
self.logs.append(message)
# Forcer une copie pour éviter les problèmes de référence
self.logs = self.logs.copy()
print(f"[{self.session_uid}] {message}")
def get_logs(self):
"""
Get all logs
Returns:
List of log messages
"""
return self.logs.copy()
def is_task_completed(self):
"""
Check if the task is completed
Returns:
True if completed, False otherwise
"""
return self.is_completed
def run(self, file_path: str):
"""
Run the benchmark process
Args:
file_path: Path to the uploaded file
"""
# Démarrer dans un thread séparé pour ne pas bloquer
import threading
thread = threading.Thread(target=self._run_process, args=(file_path,))
thread.daemon = True
thread.start()
def _run_process(self, file_path: str):
"""
Internal method to run the process
Args:
file_path: Path to the uploaded file
"""
try:
# Étape 1: Configuration
self._add_log("[INFO] Starting configuration process")
self.config_task = CreateBenchConfigTask(session_uid=self.session_uid)
# Exécuter la tâche de configuration
config_path = self.config_task.run(file_path=file_path)
# Récupérer les logs de configuration
config_logs = self.config_task.get_logs()
for log in config_logs:
self._add_log(log)
# Marquer l'étape de configuration comme terminée
if "[SUCCESS] Stage completed: config_generation" not in self.logs:
self._add_log("[SUCCESS] Stage completed: configuration")
# Étape 2: Benchmark
self._add_log("[INFO] Starting benchmark process")
self.bench_task = CreateBenchTask(session_uid=self.session_uid, config_path=config_path)
# Exécuter la tâche de benchmark
self.bench_task.run()
# Attendre que la tâche de benchmark soit terminée
while not self.bench_task.is_task_completed():
# Récupérer les nouveaux logs et les ajouter
bench_logs = self.bench_task.get_logs()
for log in bench_logs:
self._add_log(log)
time.sleep(1)
# Récupérer les logs finaux
final_logs = self.bench_task.get_logs()
for log in final_logs:
self._add_log(log)
# Marquer comme terminé
self.is_completed = True
self._add_log("[SUCCESS] Benchmark process completed successfully")
except Exception as e:
self._add_log(f"[ERROR] Benchmark process failed: {str(e)}")
self.is_completed = True |