from fastapi import APIRouter, HTTPException from typing import Dict, Any import os from tasks.evaluationTask import EvaluationTask router = APIRouter(tags=["evaluation"]) # Store active evaluation tasks by session_id active_evaluation_tasks = {} @router.post("/evaluate-benchmark") async def evaluate_benchmark(data: Dict[str, Any]): """ Lancer l'évaluation d'un benchmark pour une session donnée Args: data: Dictionary contenant session_id Returns: Dictionary avec statut et logs initiaux """ session_id = data.get("session_id") if not session_id: return {"error": "Session ID manquant ou invalide"} # Vérifier si une évaluation est déjà en cours pour cette session if session_id in active_evaluation_tasks: evaluation_task = active_evaluation_tasks[session_id] # Si l'évaluation est déjà terminée, on peut en lancer une nouvelle if evaluation_task.is_task_completed(): # Suppression de l'ancienne tâche del active_evaluation_tasks[session_id] else: # Une évaluation est déjà en cours return { "status": "already_running", "message": "Une évaluation est déjà en cours pour cette session", "logs": evaluation_task.get_logs() } try: # Nom du dataset basé sur l'ID de session dataset_name = f"yourbench_{session_id}" # Créer et démarrer une nouvelle tâche d'évaluation evaluation_task = EvaluationTask(session_uid=session_id, dataset_name=dataset_name) active_evaluation_tasks[session_id] = evaluation_task # Démarrer l'évaluation de manière asynchrone evaluation_task.run() # Récupérer les logs initiaux initial_logs = evaluation_task.get_logs() return { "status": "started", "message": f"Évaluation démarrée pour le benchmark {dataset_name}", "logs": initial_logs } except Exception as e: return { "status": "error", "error": str(e), "message": f"Erreur lors du démarrage de l'évaluation: {str(e)}" } @router.get("/evaluation-logs/{session_id}") async def get_evaluation_logs(session_id: str): """ Récupérer les logs d'une évaluation en cours Args: session_id: ID de la session pour laquelle récupérer les logs Returns: Dictionary avec logs et statut de complétion """ if session_id not in active_evaluation_tasks: raise HTTPException(status_code=404, detail="Tâche d'évaluation non trouvée") evaluation_task = active_evaluation_tasks[session_id] logs = evaluation_task.get_logs() is_completed = evaluation_task.is_task_completed() # Récupérer les résultats si disponibles et l'évaluation est terminée results = None if is_completed and hasattr(evaluation_task, 'results') and evaluation_task.results: results = evaluation_task.results return { "logs": logs, "is_completed": is_completed, "results": results } @router.get("/evaluation-results/{session_id}") async def get_evaluation_results(session_id: str): """ Retrieve results of a completed evaluation Args: session_id: Session ID to retrieve results for Returns: Dictionary with evaluation results """ # First, check if the task is in memory if session_id in active_evaluation_tasks: evaluation_task = active_evaluation_tasks[session_id] if not evaluation_task.is_task_completed(): return { "success": False, "message": "Evaluation is still in progress" } if hasattr(evaluation_task, 'results') and evaluation_task.results: return { "success": True, "results": evaluation_task.results } # If we get here, either the task is not in memory or it doesn't have results # Try to load results from file try: # Construct the path to the results file results_path = f"uploaded_files/{session_id}/lighteval_results/models_comparison.json" # Check if the file exists if not os.path.exists(results_path): return { "success": False, "message": "No evaluation results found for this session" } # Read the file import json with open(results_path, 'r') as f: results = json.load(f) return { "success": True, "results": results } except Exception as e: return { "success": False, "message": f"Error retrieving evaluation results: {str(e)}" }