demo / backend /tasks /evaluation_task.py
tfrere's picture
block >1mo files | translate comments in english
d6f0b38
raw
history blame
11.1 kB
"""
Task to run evaluation using lighteval
"""
import os
import time
import subprocess
import tempfile
from pathlib import Path
import concurrent.futures
from dotenv import load_dotenv
from datetime import datetime
import json
import shutil
from typing import List, Dict
from tasks.get_model_providers import get_model_providers
from huggingface_hub import HfApi
import asyncio
# Valeur par défaut du timeout
DEFAULT_EVALUATION_TIMEOUT = 60.0 # 1 minute par défaut
class EvaluationTask:
"""
Task to run evaluation using lighteval
"""
def __init__(self, session_uid: str, dataset_name: str, clean_old_results: bool = False, timeout: float = None):
"""
Initialize the evaluation task
Args:
session_uid: Session ID for this task
dataset_name: Name of the dataset to evaluate
clean_old_results: If True, clean old results before evaluation
timeout: Timeout in seconds for each model evaluation (if None, uses default)
"""
self.session_uid = session_uid
self.dataset_name = dataset_name
self.is_completed = False
self.results = []
self.hf_api = HfApi()
self.timeout = timeout if timeout is not None else DEFAULT_EVALUATION_TIMEOUT
# Nettoyer les anciens résultats si demandé
if clean_old_results:
self.clean_old_results()
def clean_old_results(self) -> None:
"""
Clean old evaluation results to avoid confusion
"""
print(f"[{datetime.now().strftime('%H:%M:%S')}] Checking and cleaning old results...")
# Path to LightEval results
results_dir = Path(f"uploaded_files/{self.session_uid}/lighteval_results")
# Delete if exists
if results_dir.exists():
print(f"[{datetime.now().strftime('%H:%M:%S')}] Deleting old LightEval results")
shutil.rmtree(results_dir)
print(f"[{datetime.now().strftime('%H:%M:%S')}] Cleaning complete")
else:
print(f"[{datetime.now().strftime('%H:%M:%S')}] No old results found")
# Also check for intermediate lighteval results
if os.path.exists("data/lighteval_results"):
print(f"[{datetime.now().strftime('%H:%M:%S')}] Cleaning intermediate results")
try:
shutil.rmtree("data/lighteval_results", ignore_errors=True)
except Exception as e:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Error cleaning intermediate results: {str(e)}")
def _save_results_to_hub(self) -> None:
"""
Save evaluation results directly to the dataset on the Hub without persisting locally
"""
try:
# Trier les résultats par précision (du plus précis au moins précis)
sorted_results = sorted(self.results, key=lambda x: x.get('accuracy', 0), reverse=True)
# Créer un fichier temporaire pour les résultats
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as temp_file:
# Ajouter metadata aux résultats
final_results = {
"metadata": {
"evaluation_date": datetime.now().isoformat(),
"session_id": self.session_uid,
"dataset_name": self.dataset_name
},
"results": sorted_results
}
json.dump(final_results, temp_file, indent=2)
temp_file_path = temp_file.name
# Push to Hub
self.hf_api.upload_file(
path_or_fileobj=temp_file_path,
path_in_repo="lighteval_results.json",
repo_id=self.dataset_name,
repo_type="dataset",
commit_message="Add lighteval evaluation results"
)
print(f"[{datetime.now().strftime('%H:%M:%S')}] Results saved to Hub at {self.dataset_name}/lighteval_results.json")
# Supprimer le fichier temporaire
os.unlink(temp_file_path)
except Exception as e:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Failed to save results to Hub: {str(e)}")
async def _run_lighteval(self, model_name: str, provider: str) -> dict:
start_time = time.time()
print(f"[{datetime.now().strftime('%H:%M:%S')}] Starting evaluation with {provider} provider for {model_name}")
# Create temporary task file
temp_file_path = tempfile.mktemp(suffix=".py")
with open(temp_file_path, 'w') as temp_file:
temp_file.write(f"""
from lighteval_task.lighteval_task import create_yourbench_task
# Create yourbench task
yourbench = create_yourbench_task("{self.dataset_name}", "single_shot_questions")
# Define TASKS_TABLE needed by lighteval
TASKS_TABLE = [yourbench]
""")
# Create output directory in the session folder
output_dir = f"uploaded_files/{self.session_uid}/lighteval_results"
os.makedirs(output_dir, exist_ok=True)
# LightEval command
cmd_args = [
"lighteval",
"endpoint",
"inference-providers",
f"model={model_name},provider={provider}",
"custom|yourbench|0|0",
"--custom-tasks",
temp_file_path,
"--max-samples", "30",
"--output-dir", output_dir,
"--save-details",
"--no-push-to-hub"
]
try:
# Run the command with environment variables and increased timeout of 300 seconds
process = await asyncio.create_subprocess_exec(
*cmd_args,
env=os.environ,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
try:
await asyncio.wait_for(process.communicate(), timeout=self.timeout)
except asyncio.TimeoutError:
process.kill()
print(f"[{datetime.now().strftime('%H:%M:%S')}] Evaluation timed out for {model_name} after {time.time() - start_time:.2f}s")
# Clean up temporary files
os.unlink(temp_file_path)
return {
"model": model_name,
"provider": provider,
"accuracy": 0.0,
"execution_time": self.timeout,
"status": "timeout"
}
except Exception as e:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Error running evaluation for {model_name}: {str(e)}")
# Clean up temporary files
os.unlink(temp_file_path)
return {
"model": model_name,
"provider": provider,
"accuracy": 0.0,
"execution_time": time.time() - start_time,
"status": "error"
}
# Calculate execution time
execution_time = time.time() - start_time
print(f"[{datetime.now().strftime('%H:%M:%S')}] Finished evaluation for {model_name} in {execution_time:.2f}s")
try:
# Get results from the output file
results_dir = Path(output_dir) / "results" / model_name.replace("/", "/")
results_file = next(results_dir.glob("results_*.json"))
with open(results_file) as f:
results = json.load(f)
accuracy = results["results"]["all"]["accuracy"]
result_data = {
"model": model_name,
"provider": provider,
"accuracy": accuracy,
"execution_time": execution_time,
"status": "success"
}
except Exception as e:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Failed to parse results for {model_name} after {execution_time:.2f}s: {str(e)}")
result_data = {
"model": model_name,
"provider": provider,
"accuracy": 0.0,
"execution_time": execution_time,
"status": "parse_error"
}
# Clean up temporary files
os.unlink(temp_file_path)
return result_data
async def run(self, clean_first: bool = True) -> None:
"""
Run the evaluation task asynchronously
Args:
clean_first: If True, clean old results before starting (default: True)
"""
# Systematically clean old results before starting
self.clean_old_results()
# Start global timer
script_start_time = time.time()
# Load environment variables
load_dotenv()
# Models to evaluate
models = [
"Qwen/QwQ-32B",
"Qwen/Qwen2.5-72B-Instruct",
"deepseek-ai/DeepSeek-V3-0324",
"deepseek-ai/DeepSeek-R1-Distill-Llama-70B",
]
# Get providers for each model
model_providers = get_model_providers(models)
print(f"[{datetime.now().strftime('%H:%M:%S')}] Starting parallel evaluations")
# Run evaluations in parallel using asyncio
tasks = []
for model_name, providers in model_providers:
if providers: # Only run if providers are available
tasks.append(self._run_lighteval(model_name, providers[0]))
self.results = await asyncio.gather(*tasks)
# Calculate total script execution time
total_time = time.time() - script_start_time
print(f"[{datetime.now().strftime('%H:%M:%S')}] All evaluations completed in {total_time:.2f}s")
# Cleanup intermediate results if they exist
if os.path.exists("data/lighteval_results"):
print(f"[{datetime.now().strftime('%H:%M:%S')}] Cleaning up intermediate results")
try:
# Recursively delete intermediate results
import shutil
shutil.rmtree("data/lighteval_results", ignore_errors=True)
except Exception as e:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Warning: Failed to clean up intermediate results: {str(e)}")
# Save final results to Hub (only once)
self._save_results_to_hub()
# Mark the task as completed
self.is_completed = True
def get_logs(self) -> List[str]:
"""
Get logs for this task (empty list since we don't track logs anymore)
Returns:
Empty list of logs
"""
return []
def is_task_completed(self) -> bool:
"""
Check if the task is completed
Returns:
True if completed, False otherwise
"""
return self.is_completed