demo / backend /tasks /evaluation_task.py
tfrere's picture
add moder provider switching to eval
4fb52f5
raw
history blame
12.6 kB
"""
Task to run evaluation using lighteval
"""
import os
import time
import subprocess
import tempfile
from pathlib import Path
import concurrent.futures
from dotenv import load_dotenv
from datetime import datetime
import json
import shutil
from typing import List, Dict
from tasks.get_model_providers import get_model_providers
from tasks.get_available_model_provider import get_available_model_provider
from huggingface_hub import HfApi
import asyncio
# Valeur par défaut du timeout
DEFAULT_EVALUATION_TIMEOUT = 70.0 # 1 minute par défaut
class EvaluationTask:
"""
Task to run evaluation using lighteval
"""
def __init__(self, session_uid: str, dataset_name: str, clean_old_results: bool = False, timeout: float = None):
"""
Initialize the evaluation task
Args:
session_uid: Session ID for this task
dataset_name: Name of the dataset to evaluate
clean_old_results: If True, clean old results before evaluation
timeout: Timeout in seconds for each model evaluation (if None, uses default)
"""
self.session_uid = session_uid
self.dataset_name = dataset_name
self.is_completed = False
self.results = []
self.hf_api = HfApi()
self.timeout = timeout if timeout is not None else DEFAULT_EVALUATION_TIMEOUT
self.current_step = "initializing"
self.completed_steps = []
# Nettoyer les anciens résultats si demandé
if clean_old_results:
self.clean_old_results()
def update_step(self, step: str) -> None:
"""
Update the current step and completed steps
Args:
step: Name of the step to update
"""
self.current_step = step
if step not in self.completed_steps:
self.completed_steps.append(step)
def get_progress(self) -> Dict:
"""
Get the current progress of the task
Returns:
Dictionary containing current step and completed steps
"""
return {
"current_step": self.current_step,
"completed_steps": self.completed_steps
}
def clean_old_results(self) -> None:
"""
Clean old evaluation results to avoid confusion
"""
print(f"[{datetime.now().strftime('%H:%M:%S')}] Checking and cleaning old results...")
# Path to LightEval results
results_dir = Path(f"uploaded_files/{self.session_uid}/lighteval_results")
# Delete if exists
if results_dir.exists():
print(f"[{datetime.now().strftime('%H:%M:%S')}] Deleting old LightEval results")
shutil.rmtree(results_dir)
print(f"[{datetime.now().strftime('%H:%M:%S')}] Cleaning complete")
else:
print(f"[{datetime.now().strftime('%H:%M:%S')}] No old results found")
# Also check for intermediate lighteval results
if os.path.exists("data/lighteval_results"):
print(f"[{datetime.now().strftime('%H:%M:%S')}] Cleaning intermediate results")
try:
shutil.rmtree("data/lighteval_results", ignore_errors=True)
except Exception as e:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Error cleaning intermediate results: {str(e)}")
def _save_results_to_hub(self) -> None:
"""
Save evaluation results directly to the dataset on the Hub without persisting locally
"""
try:
# Trier les résultats par précision (du plus précis au moins précis)
sorted_results = sorted(self.results, key=lambda x: x.get('accuracy', 0), reverse=True)
# Créer un fichier temporaire pour les résultats
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as temp_file:
# Ajouter metadata aux résultats
final_results = {
"metadata": {
"evaluation_date": datetime.now().isoformat(),
"session_id": self.session_uid,
"dataset_name": self.dataset_name
},
"results": sorted_results
}
json.dump(final_results, temp_file, indent=2)
temp_file_path = temp_file.name
# Push to Hub
self.hf_api.upload_file(
path_or_fileobj=temp_file_path,
path_in_repo="lighteval_results.json",
repo_id=self.dataset_name,
repo_type="dataset",
commit_message="Add lighteval evaluation results"
)
print(f"[{datetime.now().strftime('%H:%M:%S')}] Results saved to Hub at {self.dataset_name}/lighteval_results.json")
# Supprimer le fichier temporaire
os.unlink(temp_file_path)
except Exception as e:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Failed to save results to Hub: {str(e)}")
async def _run_lighteval(self, model_name: str, provider: str) -> dict:
start_time = time.time()
print(f"[{datetime.now().strftime('%H:%M:%S')}] Starting evaluation with {provider} provider for {model_name}")
# Create temporary task file
temp_file_path = tempfile.mktemp(suffix=".py")
with open(temp_file_path, 'w') as temp_file:
temp_file.write(f"""
from lighteval_task.lighteval_task import create_yourbench_task
# Create yourbench task
yourbench = create_yourbench_task("{self.dataset_name}", "single_shot_questions")
# Define TASKS_TABLE needed by lighteval
TASKS_TABLE = [yourbench]
""")
# Create output directory in the session folder
output_dir = f"uploaded_files/{self.session_uid}/lighteval_results"
os.makedirs(output_dir, exist_ok=True)
# LightEval command
cmd_args = [
"lighteval",
"endpoint",
"inference-providers",
f"model={model_name},provider={provider}",
"custom|yourbench|0|0",
"--custom-tasks",
temp_file_path,
"--max-samples", "30",
"--output-dir", output_dir,
"--save-details",
"--no-push-to-hub"
]
try:
# Run the command with environment variables and increased timeout of 300 seconds
process = await asyncio.create_subprocess_exec(
*cmd_args,
env=os.environ,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
try:
await asyncio.wait_for(process.communicate(), timeout=self.timeout)
except asyncio.TimeoutError:
process.kill()
print(f"[{datetime.now().strftime('%H:%M:%S')}] Evaluation timed out for {model_name} after {time.time() - start_time:.2f}s")
# Clean up temporary files
os.unlink(temp_file_path)
return {
"model": model_name,
"provider": provider,
"accuracy": 0.0,
"execution_time": self.timeout,
"status": "timeout"
}
except Exception as e:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Error running evaluation for {model_name}: {str(e)}")
# Clean up temporary files
os.unlink(temp_file_path)
return {
"model": model_name,
"provider": provider,
"accuracy": 0.0,
"execution_time": time.time() - start_time,
"status": "error"
}
# Calculate execution time
execution_time = time.time() - start_time
print(f"[{datetime.now().strftime('%H:%M:%S')}] Finished evaluation for {model_name} in {execution_time:.2f}s")
try:
# Get results from the output file
results_dir = Path(output_dir) / "results" / model_name.replace("/", "/")
results_file = next(results_dir.glob("results_*.json"))
with open(results_file) as f:
results = json.load(f)
accuracy = results["results"]["all"]["accuracy"]
result_data = {
"model": model_name,
"provider": provider,
"accuracy": accuracy,
"execution_time": execution_time,
"status": "success"
}
except Exception as e:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Failed to parse results for {model_name} after {execution_time:.2f}s: {str(e)}")
result_data = {
"model": model_name,
"provider": provider,
"accuracy": 0.0,
"execution_time": execution_time,
"status": "parse_error"
}
# Clean up temporary files
os.unlink(temp_file_path)
return result_data
async def run(self, clean_first: bool = True) -> None:
"""
Run the evaluation task asynchronously
Args:
clean_first: If True, clean old results before starting (default: True)
"""
# Systematically clean old results before starting
self.clean_old_results()
# Start global timer
script_start_time = time.time()
# Load environment variables
load_dotenv()
# Models to evaluate
models = [
"Qwen/QwQ-32B",
"Qwen/Qwen2.5-72B-Instruct",
"meta-llama/Llama-3.3-70B-Instruct",
"deepseek-ai/DeepSeek-R1-Distill-Llama-70B",
"mistralai/Mistral-Small-24B-Instruct-2501",
]
# Step 1: Check available providers for each model
self.update_step("finding_available_model_providers")
print(f"[{datetime.now().strftime('%H:%M:%S')}] Checking available providers for models...")
model_providers = {}
for model in models:
provider = get_available_model_provider(model, verbose=True)
if provider:
model_providers[model] = provider
else:
print(f"[{datetime.now().strftime('%H:%M:%S')}] No available provider found for {model}")
if not model_providers:
print(f"[{datetime.now().strftime('%H:%M:%S')}] No models with available providers found")
return
print(f"[{datetime.now().strftime('%H:%M:%S')}] Found providers for {len(model_providers)} models")
# Step 2: Run evaluations in parallel
self.update_step("starting_evaluation_process")
print(f"[{datetime.now().strftime('%H:%M:%S')}] Starting evaluation process...")
# Step 3: Evaluate models
self.update_step("evaluating_models")
print(f"[{datetime.now().strftime('%H:%M:%S')}] Evaluating models...")
tasks = []
for model, provider in model_providers.items():
tasks.append(self._run_lighteval(model, provider))
# Run all evaluations in parallel
results = await asyncio.gather(*tasks)
# Filter out failed evaluations
self.results = [r for r in results if r["status"] == "success"]
# Step 4: Save results
self.update_step("storing_evaluation_results")
print(f"[{datetime.now().strftime('%H:%M:%S')}] Storing evaluation results...")
self._save_results_to_hub()
# Mark task as completed
self.is_completed = True
self.update_step("completed")
total_time = time.time() - script_start_time
print(f"[{datetime.now().strftime('%H:%M:%S')}] Evaluation completed in {total_time:.2f}s")
def get_logs(self) -> List[str]:
"""
Get the logs of the task
Returns:
List of log messages
"""
return self.logs if hasattr(self, "logs") else []
def is_task_completed(self) -> bool:
"""
Check if the task is completed
Returns:
True if the task is completed, False otherwise
"""
return self.is_completed