demo / backend /tasks /evaluation_task.py
sumuks's picture
add qwen2.5 32b as the default
eee5a9a
raw
history blame
17.1 kB
"""
Task to run evaluation using lighteval
"""
import os
import time
import subprocess
import tempfile
from pathlib import Path
import concurrent.futures
from dotenv import load_dotenv
from datetime import datetime
import json
import shutil
from typing import List, Dict
from tasks.get_available_model_provider import get_available_model_provider
from huggingface_hub import HfApi
import asyncio
# Valeur par défaut du timeout
DEFAULT_EVALUATION_TIMEOUT = 120.0 # 1 minute par défaut
class EvaluationTask:
"""
Task to run evaluation using lighteval
"""
def __init__(self, session_uid: str, dataset_name: str, clean_old_results: bool = False, timeout: float = None):
"""
Initialize the evaluation task
Args:
session_uid: Session ID for this task
dataset_name: Name of the dataset to evaluate
clean_old_results: If True, clean old results before evaluation
timeout: Timeout in seconds for each model evaluation (if None, uses default)
"""
self.session_uid = session_uid
self.dataset_name = dataset_name
self.is_completed = False
self.results = []
self.hf_api = HfApi()
self.timeout = timeout if timeout is not None else DEFAULT_EVALUATION_TIMEOUT
self.current_step = "initializing"
self.completed_steps = []
self.step_start_time = time.time() # Enregistrer le temps de début de l'étape actuelle
# Nettoyer les anciens résultats si demandé
if clean_old_results:
self.clean_old_results()
async def update_step(self, step: str) -> None:
"""
Update the current step and completed steps with a minimum delay of 1 second
Args:
step: Name of the step to update
"""
# Calculer le temps écoulé depuis le début de l'étape précédente
elapsed_since_step_start = time.time() - self.step_start_time
# Si moins d'une seconde s'est écoulée, attendre pour compléter la seconde
if elapsed_since_step_start < 1.0:
await asyncio.sleep(1.0 - elapsed_since_step_start)
# Mettre à jour l'étape courante et enregistrer le nouvel horodatage
self.current_step = step
self.step_start_time = time.time()
# Ajouter aux étapes complétées si nécessaire
if step not in self.completed_steps:
self.completed_steps.append(step)
print(f"[{datetime.now().strftime('%H:%M:%S')}] Step changed to: {step}")
def get_progress(self) -> Dict:
"""
Get the current progress of the task
Returns:
Dictionary containing current step and completed steps
"""
return {
"current_step": self.current_step,
"completed_steps": self.completed_steps
}
def clean_old_results(self) -> None:
"""
Clean old evaluation results to avoid confusion
"""
print(f"[{datetime.now().strftime('%H:%M:%S')}] Checking and cleaning old results...")
# Path to LightEval results
results_dir = Path(f"uploaded_files/{self.session_uid}/lighteval_results")
# Delete if exists
if results_dir.exists():
print(f"[{datetime.now().strftime('%H:%M:%S')}] Deleting old LightEval results")
shutil.rmtree(results_dir)
print(f"[{datetime.now().strftime('%H:%M:%S')}] Cleaning complete")
else:
print(f"[{datetime.now().strftime('%H:%M:%S')}] No old results found")
# Also check for intermediate lighteval results
if os.path.exists("data/lighteval_results"):
print(f"[{datetime.now().strftime('%H:%M:%S')}] Cleaning intermediate results")
try:
shutil.rmtree("data/lighteval_results", ignore_errors=True)
except Exception as e:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Error cleaning intermediate results: {str(e)}")
def _save_results_to_hub(self) -> None:
"""
Save evaluation results directly to the dataset on the Hub without persisting locally
"""
try:
# Trier les résultats par précision (du plus précis au moins précis)
sorted_results = sorted(self.results, key=lambda x: x.get('accuracy', 0), reverse=True)
# Créer un fichier temporaire pour les résultats
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as temp_file:
# Ajouter metadata aux résultats
final_results = {
"metadata": {
"evaluation_date": datetime.now().isoformat(),
"session_id": self.session_uid,
"dataset_name": self.dataset_name
},
"results": sorted_results
}
json.dump(final_results, temp_file, indent=2)
temp_file_path = temp_file.name
# Push to Hub
self.hf_api.upload_file(
path_or_fileobj=temp_file_path,
path_in_repo="lighteval_results.json",
repo_id=self.dataset_name,
repo_type="dataset",
commit_message="Add lighteval evaluation results"
)
print(f"[{datetime.now().strftime('%H:%M:%S')}] Results saved to Hub at {self.dataset_name}/lighteval_results.json")
# Supprimer le fichier temporaire
os.unlink(temp_file_path)
except Exception as e:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Failed to save results to Hub: {str(e)}")
async def _run_lighteval(self, model_name: str, provider: str) -> dict:
start_time = time.time()
print(f"[{datetime.now().strftime('%H:%M:%S')}] Starting evaluation with {provider} provider for {model_name}")
# Create temporary task file
temp_file_path = tempfile.mktemp(suffix=".py")
with open(temp_file_path, 'w') as temp_file:
temp_file.write(f"""
from lighteval_task.lighteval_task import create_yourbench_task
# Create yourbench task
yourbench = create_yourbench_task("{self.dataset_name}", "single_shot_questions")
# Define TASKS_TABLE needed by lighteval
TASKS_TABLE = [yourbench]
""")
# Create output directory in the session folder
output_dir = f"uploaded_files/{self.session_uid}/lighteval_results"
os.makedirs(output_dir, exist_ok=True)
# LightEval command
cmd_args = [
"lighteval",
"endpoint",
"inference-providers",
f"model={model_name},provider={provider}",
"custom|yourbench|0|0",
"--custom-tasks",
temp_file_path,
"--max-samples", "30",
"--output-dir", output_dir,
"--save-details",
"--no-push-to-hub"
]
try:
# Run the command with environment variables and increased timeout of 300 seconds
process = await asyncio.create_subprocess_exec(
*cmd_args,
env=os.environ,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
try:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Running command: {' '.join(cmd_args)}")
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=self.timeout)
# Log stdout and stderr
if stdout:
stdout_decoded = stdout.decode('utf-8')
print(f"[{datetime.now().strftime('%H:%M:%S')}] LightEval STDOUT for {model_name}:")
for line in stdout_decoded.splitlines():
print(f"[STDOUT] {line}")
if stderr:
stderr_decoded = stderr.decode('utf-8')
print(f"[{datetime.now().strftime('%H:%M:%S')}] LightEval STDERR for {model_name}:")
for line in stderr_decoded.splitlines():
print(f"[STDERR] {line}")
# Check return code
if process.returncode != 0:
print(f"[{datetime.now().strftime('%H:%M:%S')}] LightEval failed with return code {process.returncode}")
except asyncio.TimeoutError:
process.kill()
print(f"[{datetime.now().strftime('%H:%M:%S')}] Evaluation timed out for {model_name} after {time.time() - start_time:.2f}s")
# Clean up temporary files
os.unlink(temp_file_path)
return {
"model": model_name,
"provider": provider,
"accuracy": 0.0,
"execution_time": self.timeout,
"status": "timeout"
}
except Exception as e:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Error running evaluation for {model_name}: {str(e)}")
# Clean up temporary files
os.unlink(temp_file_path)
return {
"model": model_name,
"provider": provider,
"accuracy": 0.0,
"execution_time": time.time() - start_time,
"status": "error"
}
# Calculate execution time
execution_time = time.time() - start_time
print(f"[{datetime.now().strftime('%H:%M:%S')}] Finished evaluation for {model_name} in {execution_time:.2f}s")
try:
# Get results from the output file
results_dir = Path(output_dir) / "results" / model_name.replace("/", "/")
print(f"[{datetime.now().strftime('%H:%M:%S')}] Looking for results in {results_dir}")
if not results_dir.exists():
print(f"[{datetime.now().strftime('%H:%M:%S')}] Results directory doesn't exist for {model_name}")
raise FileNotFoundError(f"Results directory not found: {results_dir}")
results_files = list(results_dir.glob("results_*.json"))
if not results_files:
print(f"[{datetime.now().strftime('%H:%M:%S')}] No results files found in {results_dir}")
raise FileNotFoundError(f"No results files found in {results_dir}")
results_file = results_files[0]
print(f"[{datetime.now().strftime('%H:%M:%S')}] Using results file: {results_file}")
with open(results_file) as f:
results = json.load(f)
print(f"[{datetime.now().strftime('%H:%M:%S')}] Results structure: {json.dumps(list(results.keys()))}")
# Vérifier que la structure est celle attendue
if "results" in results and "all" in results["results"] and "accuracy" in results["results"]["all"]:
accuracy = results["results"]["all"]["accuracy"]
print(f"[{datetime.now().strftime('%H:%M:%S')}] Extracted accuracy: {accuracy}")
else:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Structure de résultats inattendue. Clés disponibles: {list(results.keys())}")
if "results" in results:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Clés dans 'results': {list(results['results'].keys()) if isinstance(results['results'], dict) else 'pas un dictionnaire'}")
raise ValueError(f"Structure de résultats inattendue pour {model_name}")
result_data = {
"model": model_name,
"provider": provider,
"accuracy": accuracy,
"execution_time": execution_time,
"status": "success"
}
except Exception as e:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Failed to parse results for {model_name} after {execution_time:.2f}s: {str(e)}")
result_data = {
"model": model_name,
"provider": provider,
"accuracy": 0.0,
"execution_time": execution_time,
"status": "parse_error"
}
# Clean up temporary files
os.unlink(temp_file_path)
return result_data
async def run(self, clean_first: bool = True) -> None:
"""
Run the evaluation task asynchronously
Args:
clean_first: If True, clean old results before starting (default: True)
"""
# Systematically clean old results before starting
self.clean_old_results()
# Start global timer
script_start_time = time.time()
# Load environment variables
load_dotenv()
# Models to evaluate - uniquement les modèles accessibles
models = [
"Qwen/QwQ-32B",
"Qwen/Qwen2.5-72B-Instruct",
"Qwen/Qwen2.5-32B-Instruct",
"meta-llama/Llama-3.1-8B-Instruct",
"meta-llama/Llama-3.3-70B-Instruct",
"deepseek-ai/DeepSeek-R1-Distill-Llama-70B",
"mistralai/Mistral-Small-24B-Instruct-2501",
]
# Log pour voir la structure du dataset
try:
from datasets import load_dataset
print(f"[{datetime.now().strftime('%H:%M:%S')}] Tentative de chargement du dataset {self.dataset_name} pour inspection")
dataset = load_dataset(self.dataset_name, "single_shot_questions", split="train")
# Vérifier la structure du premier exemple
if len(dataset) > 0:
first_example = dataset[0]
print(f"[{datetime.now().strftime('%H:%M:%S')}] Structure du premier exemple:")
print(f"[{datetime.now().strftime('%H:%M:%S')}] Clés: {first_example.keys()}")
print(f"[{datetime.now().strftime('%H:%M:%S')}] Citations: {first_example.get('citations', 'non trouvé')}")
except Exception as e:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Erreur lors de l'inspection du dataset: {str(e)}")
# Step 1: Check available providers for each model
await self.update_step("finding_available_model_providers")
print(f"[{datetime.now().strftime('%H:%M:%S')}] Checking available providers for models...")
model_providers = {}
for model in models:
provider = get_available_model_provider(model, verbose=True)
if provider:
model_providers[model] = provider
else:
print(f"[{datetime.now().strftime('%H:%M:%S')}] No available provider found for {model}")
if not model_providers:
print(f"[{datetime.now().strftime('%H:%M:%S')}] No models with available providers found")
return
print(f"[{datetime.now().strftime('%H:%M:%S')}] Found providers for {len(model_providers)} models")
# Step 2: Run evaluations in parallel
await self.update_step("starting_evaluation_process")
print(f"[{datetime.now().strftime('%H:%M:%S')}] Starting evaluation process...")
# Step 3: Evaluate models
await self.update_step("evaluating_models")
print(f"[{datetime.now().strftime('%H:%M:%S')}] Evaluating models...")
tasks = []
for model, provider in model_providers.items():
tasks.append(self._run_lighteval(model, provider))
# Run all evaluations in parallel
results = await asyncio.gather(*tasks)
# Filter out failed evaluations
self.results = [r for r in results if r["status"] == "success"]
# Step 4: Save results
await self.update_step("storing_evaluation_results")
print(f"[{datetime.now().strftime('%H:%M:%S')}] Storing evaluation results...")
self._save_results_to_hub()
# Mark task as completed
self.is_completed = True
await self.update_step("completed")
total_time = time.time() - script_start_time
print(f"[{datetime.now().strftime('%H:%M:%S')}] Evaluation completed in {total_time:.2f}s")
def get_logs(self) -> List[str]:
"""
Get the logs of the task
Returns:
List of log messages
"""
return self.logs if hasattr(self, "logs") else []
def is_task_completed(self) -> bool:
"""
Check if the task is completed
Returns:
True if the task is completed, False otherwise
"""
return self.is_completed