Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
File size: 8,387 Bytes
970eef1 ffa4ae8 970eef1 7e389db 970eef1 7e389db 970eef1 d6f0b38 970eef1 7f7e436 970eef1 7e389db 970eef1 7e389db 970eef1 7e389db 970eef1 7e389db 970eef1 7e389db 970eef1 7e389db 970eef1 7e389db 970eef1 7e389db 970eef1 7e389db 970eef1 7e389db 970eef1 7e389db 970eef1 7e389db d6f0b38 7e389db d6f0b38 7e389db 970eef1 7e389db d6f0b38 7e389db d6f0b38 7e389db d6f0b38 0e34dc4 7e389db 0e34dc4 a86c1f9 0e34dc4 7e389db |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 |
from fastapi import APIRouter, HTTPException
from typing import Dict, Any
import os
import time
from tasks.create_bench_config_file import CreateBenchConfigTask
from tasks.create_bench import CreateBenchTask
router = APIRouter(tags=["benchmark"])
# Store active tasks by session_id (importé dans main.py)
active_tasks = {}
# Référence aux session_files (sera fournie par main.py)
# Cette déclaration sera écrasée par l'affectation dans __init__.py
session_files = {}
@router.post("/generate-benchmark")
async def generate_benchmark(data: Dict[str, Any]):
"""
Generate a benchmark configuration and run the ingestion process
Args:
data: Dictionary containing session_id
Returns:
Dictionary with logs and status
"""
session_id = data.get("session_id")
# Debug to check session_files and received session_id
print(f"DEBUG: Session ID received: {session_id}")
print(f"DEBUG: Available session files: {list(router.session_files.keys())}")
if not session_id or session_id not in router.session_files:
return {"error": "Invalid or missing session ID"}
# Vérifier si un benchmark est déjà en cours ou complété pour cette session
if session_id in active_tasks:
task = active_tasks[session_id]
# Si le benchmark est déjà terminé, retourner les logs existants
if task.is_task_completed():
return {
"status": "already_completed",
"logs": task.get_logs(),
"is_completed": True
}
# Si le benchmark est en cours d'exécution, retourner les logs actuels
else:
return {
"status": "already_running",
"logs": task.get_logs(),
"is_completed": False
}
file_path = router.session_files[session_id]
all_logs = []
try:
# Initialiser la tâche qui gérera tout le processus
task = UnifiedBenchmarkTask(session_uid=session_id)
# Stockage pour récupération ultérieure des logs
active_tasks[session_id] = task
# Démarrer le processus de benchmark
task.run(file_path)
# Récupérer les logs initiaux
all_logs = task.get_logs()
return {
"status": "running",
"logs": all_logs
}
except Exception as e:
return {
"status": "error",
"error": str(e),
"logs": all_logs
}
@router.get("/benchmark-progress/{session_id}")
async def get_benchmark_progress(session_id: str):
"""
Get the logs and status for a running benchmark task
Args:
session_id: Session ID for the task
Returns:
Dictionary with logs and completion status
"""
if session_id not in active_tasks:
raise HTTPException(status_code=404, detail="Benchmark task not found")
task = active_tasks[session_id]
logs = task.get_logs()
is_completed = task.is_task_completed()
return {
"logs": logs,
"is_completed": is_completed
}
# Créer une classe qui unifie le processus de benchmark
class UnifiedBenchmarkTask:
"""
Task that handles the entire benchmark process from configuration to completion
"""
def __init__(self, session_uid: str):
"""
Initialize the unified benchmark task
Args:
session_uid: Session ID for this task
"""
self.session_uid = session_uid
self.logs = []
self.is_completed = False
self.config_task = None
self.bench_task = None
self._add_log("[INFO] Initializing benchmark task")
def _add_log(self, message: str):
"""
Add a log message
Args:
message: Log message to add
"""
if message not in self.logs: # Avoid duplicates
self.logs.append(message)
# Force a copy to avoid reference problems
self.logs = self.logs.copy()
print(f"[{self.session_uid}] {message}")
def get_logs(self):
"""
Get all logs
Returns:
List of log messages
"""
return self.logs.copy()
def is_task_completed(self):
"""
Check if the task is completed
Returns:
True if completed, False otherwise
"""
return self.is_completed
def run(self, file_path: str):
"""
Run the benchmark process
Args:
file_path: Path to the uploaded file
"""
# Start in a separate thread to avoid blocking
import threading
thread = threading.Thread(target=self._run_process, args=(file_path,))
thread.daemon = True
thread.start()
def _run_process(self, file_path: str):
"""
Internal method to run the process
Args:
file_path: Path to the uploaded file
"""
try:
# Step 1: Configuration
self._add_log("[INFO] Starting configuration process")
self.config_task = CreateBenchConfigTask(session_uid=self.session_uid)
# Execute the configuration task
try:
config_path = self.config_task.run(file_path=file_path)
# Get configuration logs
config_logs = self.config_task.get_logs()
for log in config_logs:
self._add_log(log)
# Mark configuration step as completed
if "[SUCCESS] Stage completed: config_generation" not in self.logs:
self._add_log("[SUCCESS] Stage completed: configuration")
# Step 2: Benchmark
self._add_log("[INFO] Starting benchmark process")
self.bench_task = CreateBenchTask(session_uid=self.session_uid, config_path=config_path)
# Run the benchmark task
self.bench_task.run()
# Wait for the benchmark task to complete
while not self.bench_task.is_task_completed():
# Get new logs and add them
bench_logs = self.bench_task.get_logs()
for log in bench_logs:
self._add_log(log)
time.sleep(1)
# Get final logs
final_logs = self.bench_task.get_logs()
for log in final_logs:
self._add_log(log)
# Mark as completed
self.is_completed = True
# Vérifier si une erreur a été détectée dans les logs du benchmark
has_error = any("[ERROR]" in log for log in final_logs)
benchmark_terminated_with_error = any("Benchmark process terminated with error code" in log for log in final_logs)
benchmark_already_marked_success = any("Benchmark process completed successfully" in log for log in final_logs)
# N'ajouter le message de succès que si aucune erreur n'a été détectée
if not has_error and not benchmark_terminated_with_error and not benchmark_already_marked_success:
self._add_log("[SUCCESS] Benchmark process completed successfully")
except Exception as config_error:
error_msg = str(config_error)
# Log detailed error
self._add_log(f"[ERROR] Configuration failed: {error_msg}")
# Check if it's a provider error and provide a more user-friendly message
if "Required models not available" in error_msg:
self._add_log("[ERROR] Some required models are not available at the moment. Please try again later.")
# Mark as completed with error
self.is_completed = True
except Exception as e:
self._add_log(f"[ERROR] Benchmark process failed: {str(e)}")
self.is_completed = True |