""" Task to run evaluation using lighteval """ import os import time import subprocess import tempfile from pathlib import Path import concurrent.futures from dotenv import load_dotenv from datetime import datetime import json import shutil from typing import List, Dict from tasks.get_available_model_provider import get_available_model_provider from huggingface_hub import HfApi import asyncio # Valeur par défaut du timeout DEFAULT_EVALUATION_TIMEOUT = 120.0 # 1 minute par défaut class EvaluationTask: """ Task to run evaluation using lighteval """ def __init__(self, session_uid: str, dataset_name: str, clean_old_results: bool = False, timeout: float = None): """ Initialize the evaluation task Args: session_uid: Session ID for this task dataset_name: Name of the dataset to evaluate clean_old_results: If True, clean old results before evaluation timeout: Timeout in seconds for each model evaluation (if None, uses default) """ self.session_uid = session_uid self.dataset_name = dataset_name self.is_completed = False self.results = [] self.hf_api = HfApi() self.timeout = timeout if timeout is not None else DEFAULT_EVALUATION_TIMEOUT self.current_step = "initializing" self.completed_steps = [] self.step_start_time = time.time() # Enregistrer le temps de début de l'étape actuelle # Nettoyer les anciens résultats si demandé if clean_old_results: self.clean_old_results() async def update_step(self, step: str) -> None: """ Update the current step and completed steps with a minimum delay of 1 second Args: step: Name of the step to update """ # Calculer le temps écoulé depuis le début de l'étape précédente elapsed_since_step_start = time.time() - self.step_start_time # Si moins d'une seconde s'est écoulée, attendre pour compléter la seconde if elapsed_since_step_start < 1.0: await asyncio.sleep(1.0 - elapsed_since_step_start) # Mettre à jour l'étape courante et enregistrer le nouvel horodatage self.current_step = step self.step_start_time = time.time() # Ajouter aux étapes complétées si nécessaire if step not in self.completed_steps: self.completed_steps.append(step) print(f"[{datetime.now().strftime('%H:%M:%S')}] Step changed to: {step}") def get_progress(self) -> Dict: """ Get the current progress of the task Returns: Dictionary containing current step and completed steps """ return { "current_step": self.current_step, "completed_steps": self.completed_steps } def clean_old_results(self) -> None: """ Clean old evaluation results to avoid confusion """ print(f"[{datetime.now().strftime('%H:%M:%S')}] Checking and cleaning old results...") # Path to LightEval results results_dir = Path(f"uploaded_files/{self.session_uid}/lighteval_results") # Delete if exists if results_dir.exists(): print(f"[{datetime.now().strftime('%H:%M:%S')}] Deleting old LightEval results") shutil.rmtree(results_dir) print(f"[{datetime.now().strftime('%H:%M:%S')}] Cleaning complete") else: print(f"[{datetime.now().strftime('%H:%M:%S')}] No old results found") # Also check for intermediate lighteval results if os.path.exists("data/lighteval_results"): print(f"[{datetime.now().strftime('%H:%M:%S')}] Cleaning intermediate results") try: shutil.rmtree("data/lighteval_results", ignore_errors=True) except Exception as e: print(f"[{datetime.now().strftime('%H:%M:%S')}] Error cleaning intermediate results: {str(e)}") def _save_results_to_hub(self) -> None: """ Save evaluation results directly to the dataset on the Hub without persisting locally """ try: # Trier les résultats par précision (du plus précis au moins précis) sorted_results = sorted(self.results, key=lambda x: x.get('accuracy', 0), reverse=True) # Créer un fichier temporaire pour les résultats with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as temp_file: # Ajouter metadata aux résultats final_results = { "metadata": { "evaluation_date": datetime.now().isoformat(), "session_id": self.session_uid, "dataset_name": self.dataset_name }, "results": sorted_results } json.dump(final_results, temp_file, indent=2) temp_file_path = temp_file.name # Push to Hub self.hf_api.upload_file( path_or_fileobj=temp_file_path, path_in_repo="lighteval_results.json", repo_id=self.dataset_name, repo_type="dataset", commit_message="Add lighteval evaluation results" ) print(f"[{datetime.now().strftime('%H:%M:%S')}] Results saved to Hub at {self.dataset_name}/lighteval_results.json") # Supprimer le fichier temporaire os.unlink(temp_file_path) except Exception as e: print(f"[{datetime.now().strftime('%H:%M:%S')}] Failed to save results to Hub: {str(e)}") async def _run_lighteval(self, model_name: str, provider: str) -> dict: start_time = time.time() print(f"[{datetime.now().strftime('%H:%M:%S')}] Starting evaluation with {provider} provider for {model_name}") # Create temporary task file temp_file_path = tempfile.mktemp(suffix=".py") with open(temp_file_path, 'w') as temp_file: temp_file.write(f""" from lighteval_task.lighteval_task import create_yourbench_task # Create yourbench task yourbench = create_yourbench_task("{self.dataset_name}", "single_shot_questions") # Define TASKS_TABLE needed by lighteval TASKS_TABLE = [yourbench] """) # Create output directory in the session folder output_dir = f"uploaded_files/{self.session_uid}/lighteval_results" os.makedirs(output_dir, exist_ok=True) # LightEval command cmd_args = [ "lighteval", "endpoint", "inference-providers", f"model={model_name},provider={provider}", "custom|yourbench|0|0", "--custom-tasks", temp_file_path, "--max-samples", "30", "--output-dir", output_dir, "--save-details", "--no-push-to-hub" ] try: # Run the command with environment variables and increased timeout of 300 seconds process = await asyncio.create_subprocess_exec( *cmd_args, env=os.environ, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) try: print(f"[{datetime.now().strftime('%H:%M:%S')}] Running command: {' '.join(cmd_args)}") stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=self.timeout) # Log stdout and stderr if stdout: stdout_decoded = stdout.decode('utf-8') print(f"[{datetime.now().strftime('%H:%M:%S')}] LightEval STDOUT for {model_name}:") for line in stdout_decoded.splitlines(): print(f"[STDOUT] {line}") if stderr: stderr_decoded = stderr.decode('utf-8') print(f"[{datetime.now().strftime('%H:%M:%S')}] LightEval STDERR for {model_name}:") for line in stderr_decoded.splitlines(): print(f"[STDERR] {line}") # Check return code if process.returncode != 0: print(f"[{datetime.now().strftime('%H:%M:%S')}] LightEval failed with return code {process.returncode}") except asyncio.TimeoutError: process.kill() print(f"[{datetime.now().strftime('%H:%M:%S')}] Evaluation timed out for {model_name} after {time.time() - start_time:.2f}s") # Clean up temporary files os.unlink(temp_file_path) return { "model": model_name, "provider": provider, "accuracy": 0.0, "execution_time": self.timeout, "status": "timeout" } except Exception as e: print(f"[{datetime.now().strftime('%H:%M:%S')}] Error running evaluation for {model_name}: {str(e)}") # Clean up temporary files os.unlink(temp_file_path) return { "model": model_name, "provider": provider, "accuracy": 0.0, "execution_time": time.time() - start_time, "status": "error" } # Calculate execution time execution_time = time.time() - start_time print(f"[{datetime.now().strftime('%H:%M:%S')}] Finished evaluation for {model_name} in {execution_time:.2f}s") try: # Get results from the output file results_dir = Path(output_dir) / "results" / model_name.replace("/", "/") print(f"[{datetime.now().strftime('%H:%M:%S')}] Looking for results in {results_dir}") if not results_dir.exists(): print(f"[{datetime.now().strftime('%H:%M:%S')}] Results directory doesn't exist for {model_name}") raise FileNotFoundError(f"Results directory not found: {results_dir}") results_files = list(results_dir.glob("results_*.json")) if not results_files: print(f"[{datetime.now().strftime('%H:%M:%S')}] No results files found in {results_dir}") raise FileNotFoundError(f"No results files found in {results_dir}") results_file = results_files[0] print(f"[{datetime.now().strftime('%H:%M:%S')}] Using results file: {results_file}") with open(results_file) as f: results = json.load(f) print(f"[{datetime.now().strftime('%H:%M:%S')}] Results structure: {json.dumps(list(results.keys()))}") # Vérifier que la structure est celle attendue if "results" in results and "all" in results["results"] and "accuracy" in results["results"]["all"]: accuracy = results["results"]["all"]["accuracy"] print(f"[{datetime.now().strftime('%H:%M:%S')}] Extracted accuracy: {accuracy}") else: print(f"[{datetime.now().strftime('%H:%M:%S')}] Structure de résultats inattendue. Clés disponibles: {list(results.keys())}") if "results" in results: print(f"[{datetime.now().strftime('%H:%M:%S')}] Clés dans 'results': {list(results['results'].keys()) if isinstance(results['results'], dict) else 'pas un dictionnaire'}") raise ValueError(f"Structure de résultats inattendue pour {model_name}") result_data = { "model": model_name, "provider": provider, "accuracy": accuracy, "execution_time": execution_time, "status": "success" } except Exception as e: print(f"[{datetime.now().strftime('%H:%M:%S')}] Failed to parse results for {model_name} after {execution_time:.2f}s: {str(e)}") result_data = { "model": model_name, "provider": provider, "accuracy": 0.0, "execution_time": execution_time, "status": "parse_error" } # Clean up temporary files os.unlink(temp_file_path) return result_data async def run(self, clean_first: bool = True) -> None: """ Run the evaluation task asynchronously Args: clean_first: If True, clean old results before starting (default: True) """ # Systematically clean old results before starting self.clean_old_results() # Start global timer script_start_time = time.time() # Load environment variables load_dotenv() # Models to evaluate - uniquement les modèles accessibles models = [ "Qwen/QwQ-32B", "Qwen/Qwen2.5-72B-Instruct", "Qwen/Qwen2.5-32B-Instruct", "meta-llama/Llama-3.1-8B-Instruct", "meta-llama/Llama-3.3-70B-Instruct", "deepseek-ai/DeepSeek-R1-Distill-Llama-70B", "mistralai/Mistral-Small-24B-Instruct-2501", ] # Log pour voir la structure du dataset try: from datasets import load_dataset print(f"[{datetime.now().strftime('%H:%M:%S')}] Tentative de chargement du dataset {self.dataset_name} pour inspection") dataset = load_dataset(self.dataset_name, "single_shot_questions", split="train") # Vérifier la structure du premier exemple if len(dataset) > 0: first_example = dataset[0] print(f"[{datetime.now().strftime('%H:%M:%S')}] Structure du premier exemple:") print(f"[{datetime.now().strftime('%H:%M:%S')}] Clés: {first_example.keys()}") print(f"[{datetime.now().strftime('%H:%M:%S')}] Citations: {first_example.get('citations', 'non trouvé')}") except Exception as e: print(f"[{datetime.now().strftime('%H:%M:%S')}] Erreur lors de l'inspection du dataset: {str(e)}") # Step 1: Check available providers for each model await self.update_step("finding_available_model_providers") print(f"[{datetime.now().strftime('%H:%M:%S')}] Checking available providers for models...") model_providers = {} for model in models: provider = get_available_model_provider(model, verbose=True) if provider: model_providers[model] = provider else: print(f"[{datetime.now().strftime('%H:%M:%S')}] No available provider found for {model}") if not model_providers: print(f"[{datetime.now().strftime('%H:%M:%S')}] No models with available providers found") return print(f"[{datetime.now().strftime('%H:%M:%S')}] Found providers for {len(model_providers)} models") # Step 2: Run evaluations in parallel await self.update_step("starting_evaluation_process") print(f"[{datetime.now().strftime('%H:%M:%S')}] Starting evaluation process...") # Step 3: Evaluate models await self.update_step("evaluating_models") print(f"[{datetime.now().strftime('%H:%M:%S')}] Evaluating models...") tasks = [] for model, provider in model_providers.items(): tasks.append(self._run_lighteval(model, provider)) # Run all evaluations in parallel results = await asyncio.gather(*tasks) # Filter out failed evaluations self.results = [r for r in results if r["status"] == "success"] # Step 4: Save results await self.update_step("storing_evaluation_results") print(f"[{datetime.now().strftime('%H:%M:%S')}] Storing evaluation results...") self._save_results_to_hub() # Mark task as completed self.is_completed = True await self.update_step("completed") total_time = time.time() - script_start_time print(f"[{datetime.now().strftime('%H:%M:%S')}] Evaluation completed in {total_time:.2f}s") def get_logs(self) -> List[str]: """ Get the logs of the task Returns: List of log messages """ return self.logs if hasattr(self, "logs") else [] def is_task_completed(self) -> bool: """ Check if the task is completed Returns: True if the task is completed, False otherwise """ return self.is_completed