demo / backend /routes /evaluation.py
tfrere's picture
first commit
970eef1
raw
history blame
4.99 kB
from fastapi import APIRouter, HTTPException
from typing import Dict, Any
import os
from tasks.evaluationTask import EvaluationTask
router = APIRouter(tags=["evaluation"])
# Store active evaluation tasks by session_id
active_evaluation_tasks = {}
@router.post("/evaluate-benchmark")
async def evaluate_benchmark(data: Dict[str, Any]):
"""
Lancer l'évaluation d'un benchmark pour une session donnée
Args:
data: Dictionary contenant session_id
Returns:
Dictionary avec statut et logs initiaux
"""
session_id = data.get("session_id")
if not session_id:
return {"error": "Session ID manquant ou invalide"}
# Vérifier si une évaluation est déjà en cours pour cette session
if session_id in active_evaluation_tasks:
evaluation_task = active_evaluation_tasks[session_id]
# Si l'évaluation est déjà terminée, on peut en lancer une nouvelle
if evaluation_task.is_task_completed():
# Suppression de l'ancienne tâche
del active_evaluation_tasks[session_id]
else:
# Une évaluation est déjà en cours
return {
"status": "already_running",
"message": "Une évaluation est déjà en cours pour cette session",
"logs": evaluation_task.get_logs()
}
try:
# Nom du dataset basé sur l'ID de session
dataset_name = f"yourbench_{session_id}"
# Créer et démarrer une nouvelle tâche d'évaluation
evaluation_task = EvaluationTask(session_uid=session_id, dataset_name=dataset_name)
active_evaluation_tasks[session_id] = evaluation_task
# Démarrer l'évaluation de manière asynchrone
evaluation_task.run()
# Récupérer les logs initiaux
initial_logs = evaluation_task.get_logs()
return {
"status": "started",
"message": f"Évaluation démarrée pour le benchmark {dataset_name}",
"logs": initial_logs
}
except Exception as e:
return {
"status": "error",
"error": str(e),
"message": f"Erreur lors du démarrage de l'évaluation: {str(e)}"
}
@router.get("/evaluation-logs/{session_id}")
async def get_evaluation_logs(session_id: str):
"""
Récupérer les logs d'une évaluation en cours
Args:
session_id: ID de la session pour laquelle récupérer les logs
Returns:
Dictionary avec logs et statut de complétion
"""
if session_id not in active_evaluation_tasks:
raise HTTPException(status_code=404, detail="Tâche d'évaluation non trouvée")
evaluation_task = active_evaluation_tasks[session_id]
logs = evaluation_task.get_logs()
is_completed = evaluation_task.is_task_completed()
# Récupérer les résultats si disponibles et l'évaluation est terminée
results = None
if is_completed and hasattr(evaluation_task, 'results') and evaluation_task.results:
results = evaluation_task.results
return {
"logs": logs,
"is_completed": is_completed,
"results": results
}
@router.get("/evaluation-results/{session_id}")
async def get_evaluation_results(session_id: str):
"""
Retrieve results of a completed evaluation
Args:
session_id: Session ID to retrieve results for
Returns:
Dictionary with evaluation results
"""
# First, check if the task is in memory
if session_id in active_evaluation_tasks:
evaluation_task = active_evaluation_tasks[session_id]
if not evaluation_task.is_task_completed():
return {
"success": False,
"message": "Evaluation is still in progress"
}
if hasattr(evaluation_task, 'results') and evaluation_task.results:
return {
"success": True,
"results": evaluation_task.results
}
# If we get here, either the task is not in memory or it doesn't have results
# Try to load results from file
try:
# Construct the path to the results file
results_path = f"uploaded_files/{session_id}/lighteval_results/models_comparison.json"
# Check if the file exists
if not os.path.exists(results_path):
return {
"success": False,
"message": "No evaluation results found for this session"
}
# Read the file
import json
with open(results_path, 'r') as f:
results = json.load(f)
return {
"success": True,
"results": results
}
except Exception as e:
return {
"success": False,
"message": f"Error retrieving evaluation results: {str(e)}"
}