Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
from fastapi import APIRouter, HTTPException | |
from typing import Dict, Any | |
import os | |
import time | |
from tasks.createBenchConfigFile import CreateBenchConfigTask | |
from tasks.createBench import CreateBenchTask | |
router = APIRouter(tags=["benchmark"]) | |
# Store active tasks by session_id (importé dans main.py) | |
active_bench_tasks = {} | |
active_config_tasks = {} | |
# Référence aux session_files (sera fournie par main.py) | |
# Cette déclaration sera écrasée par l'affectation dans __init__.py | |
session_files = {} | |
async def generate_benchmark(data: Dict[str, Any]): | |
""" | |
Generate a benchmark configuration and run the ingestion process | |
Args: | |
data: Dictionary containing session_id | |
Returns: | |
Dictionary with logs and config_path | |
""" | |
session_id = data.get("session_id") | |
# Débogage pour vérifier les session_files et le session_id reçu | |
print(f"DEBUG: Session ID reçu: {session_id}") | |
print(f"DEBUG: Session files disponibles: {list(router.session_files.keys())}") | |
if not session_id or session_id not in router.session_files: | |
return {"error": "Invalid or missing session ID"} | |
file_path = router.session_files[session_id] | |
all_logs = [] | |
try: | |
# Step 1: Generate configuration file | |
config_task = CreateBenchConfigTask(session_uid=session_id) | |
# Store the config task for later log retrieval | |
active_config_tasks[session_id] = config_task | |
# Start configuration generation asynchronously | |
config_path = config_task.run(file_path=file_path) | |
# Add initial logs | |
all_logs.extend(config_task.get_logs()) | |
# Step 2: Run the createBench task with the generated config | |
# Note: This will be started by a separate endpoint once configuration is done | |
return { | |
"status": "running", | |
"config_path": config_path, | |
"logs": all_logs | |
} | |
except Exception as e: | |
return { | |
"status": "error", | |
"error": str(e), | |
"logs": all_logs | |
} | |
async def get_config_logs(session_id: str): | |
""" | |
Get the logs for a running configuration task | |
Args: | |
session_id: Session ID for the task | |
Returns: | |
Dictionary with logs and completion status | |
""" | |
if session_id not in active_config_tasks: | |
raise HTTPException(status_code=404, detail="Configuration task not found") | |
config_task = active_config_tasks[session_id] | |
logs = config_task.get_logs() | |
is_completed = config_task.is_task_completed() | |
# Si la configuration est terminée et que le benchmark n'est pas encore démarré, | |
# démarrer automatiquement le benchmark | |
if is_completed and session_id not in active_bench_tasks: | |
try: | |
# Ensure the config_path is a string | |
config_path_str = f"uploaded_files/{session_id}/config.yml" | |
bench_task = CreateBenchTask(session_uid=session_id, config_path=config_path_str) | |
# Store the bench task for later log retrieval | |
active_bench_tasks[session_id] = bench_task | |
# Add a transition log | |
logs.append("[INFO] Configuration file generated, starting benchmark creation") | |
# Run the task | |
bench_task.run() | |
except Exception as bench_error: | |
error_msg = f"Error starting benchmark creation: {str(bench_error)}" | |
logs.append(f"[ERROR] {error_msg}") | |
return { | |
"logs": logs, | |
"is_completed": is_completed | |
} | |
async def get_benchmark_logs(session_id: str): | |
""" | |
Get the logs for a running benchmark task | |
Args: | |
session_id: Session ID for the task | |
Returns: | |
Dictionary with logs and completion status | |
""" | |
if session_id not in active_bench_tasks: | |
raise HTTPException(status_code=404, detail="Benchmark task not found") | |
bench_task = active_bench_tasks[session_id] | |
logs = bench_task.get_logs() | |
is_completed = bench_task.is_task_completed() | |
return { | |
"logs": logs, | |
"is_completed": is_completed | |
} |