""" Task to evaluate models on a YourbBench dataset using LightEval """ import os import sys import json import time import tempfile import asyncio import threading from pathlib import Path from typing import Optional, List, Dict, Any, Tuple from loguru import logger from huggingface_hub import HfApi, CommitOperationAdd from tasks.yourbench_lighteval_task import create_yourbench_task class EvaluationTask: """ Task to evaluate models using LightEval on a YourbBench dataset """ def __init__(self, session_uid: str, dataset_name: str): """ Initialize the evaluation task Args: session_uid: Session ID for this task dataset_name: Name of the dataset to evaluate """ self.session_uid = session_uid self.dataset_name = dataset_name self.logs: List[str] = [] self.is_completed = False self.organization = os.getenv("HF_ORGANIZATION", "yourbench") self.results: Dict[str, Any] = {} self.output_dir = f"uploaded_files/{session_uid}/lighteval_results" # Models to evaluate - can be modified to allow customization self.models = [ ("Qwen/Qwen2.5-72B-Instruct", "novita"), ("Qwen/QwQ-32B", "novita"), ] self._add_log("[INFO] Initializing evaluation task") self._add_log(f"[INFO] Dataset to evaluate: {self.organization}/{dataset_name}") self._add_log(f"[INFO] Output directory: {self.output_dir}") def _add_log(self, message: str) -> None: """ Add a log message to the logs list Args: message: Log message to add """ if message not in self.logs: # Avoid duplicates self.logs.append(message) # Force copy of the list to avoid reference problems self.logs = self.logs.copy() # Record in system logs logger.info(f"[{self.session_uid}] {message}") def get_logs(self) -> List[str]: """ Get all logs for this task Returns: List of log messages """ return self.logs.copy() # Retourner une copie pour éviter les problèmes de référence def is_task_completed(self) -> bool: """ Check if the task is completed Returns: True if completed, False otherwise """ return self.is_completed async def _evaluate_model(self, model_info: Tuple[str, str]) -> Dict[str, Any]: """ Evaluate a specific model Args: model_info: Tuple of (model_name, provider) Returns: Dictionary with evaluation results """ model_name, provider = model_info self._add_log(f"[INFO] Starting evaluation for {model_name} with {provider}") # Create output directory os.makedirs(self.output_dir, exist_ok=True) # Define full dataset path dataset_path = f"{self.organization}/{self.dataset_name}" # Create temporary file temp_file_path = tempfile.mktemp(suffix=".py") self._add_log(f"[INFO] Creating temporary file for {model_name}: {temp_file_path}") with open(temp_file_path, 'w') as temp_file: temp_file.write(f""" import os import sys sys.path.append("{os.getcwd()}") from tasks.yourbench_lighteval_task import create_yourbench_task # Create yourbench task yourbench = create_yourbench_task("{dataset_path}", "lighteval") # Define TASKS_TABLE needed by lighteval TASKS_TABLE = [yourbench] """) # Build lighteval command args cmd_args = [ "lighteval", "endpoint", "inference-providers", f"model={model_name},provider={provider}", "custom|yourbench|0|0", "--custom-tasks", temp_file_path, "--max-samples", "5", "--output-dir", self.output_dir, "--save-details", "--no-push-to-hub" ] self._add_log(f"[INFO] Running command for {model_name}: {' '.join(cmd_args)}") results = { "model_name": model_name, "provider": provider, "success": False, "error": None, "results": None, "return_code": None } try: # Prepare environment with needed tokens env = os.environ.copy() hf_token = os.getenv("HF_TOKEN") if hf_token: env["HF_TOKEN"] = hf_token env["HUGGING_FACE_HUB_TOKEN"] = hf_token env["HF_ORGANIZATION"] = self.organization # Run the process asynchronously process = await asyncio.create_subprocess_exec( *cmd_args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=env ) # Wait for the process to complete stdout, stderr = await process.communicate() # Store return code exit_code = process.returncode results["return_code"] = exit_code # Log output if stdout: stdout_lines = stdout.decode().strip().split('\n') for line in stdout_lines[:5]: # Log only first 5 lines self._add_log(f"[INFO] {model_name} - {line}") # Log errors if any if stderr and exit_code != 0: stderr_lines = stderr.decode().strip().split('\n') for line in stderr_lines[:5]: # Log only first 5 lines self._add_log(f"[ERROR] {model_name} - {line}") # Find any JSON result files - LightEval organizes by model name in different ways result_files = [] results_dir = Path(self.output_dir) / "results" if results_dir.exists(): # Parcourir récursivement tous les répertoires pour trouver des fichiers JSON for json_file in results_dir.glob("**/*.json"): # Check if the filename or path contains parts of the model name model_parts = [ model_name, # Full name model_name.replace('/', '_'), # Name with / replaced by _ model_name.split('/')[-1] # Just the model name without the organization ] if any(part in str(json_file) for part in model_parts): result_files.append(json_file) # Traiter les fichiers de résultats trouvés if result_files: # Prendre le fichier le plus récent result_files.sort(key=lambda x: x.stat().st_mtime, reverse=True) latest_result = result_files[0] self._add_log(f"[INFO] {model_name} - Found result file: {latest_result}") try: with open(latest_result, 'r') as f: test_results = json.load(f) # Vérifier si les résultats contiennent les informations essentielles if (test_results and isinstance(test_results, dict) and "results" in test_results and "all" in test_results["results"]): # Enregistrer les résultats results["results"] = test_results results["success"] = True # Afficher la précision accuracy = test_results["results"]["all"]["accuracy"] accuracy_stderr = test_results["results"]["all"]["accuracy_stderr"] self._add_log(f"[SUCCESS] {model_name} - Accuracy: {accuracy:.4f} ± {accuracy_stderr:.4f}") else: results["error"] = "Incomplete or unexpected result format" self._add_log(f"[WARNING] {model_name} - Unexpected result format") except (json.JSONDecodeError, KeyError) as e: results["error"] = f"Error reading results: {str(e)}" self._add_log(f"[ERROR] {model_name} - {results['error']}") # Si aucun résultat trouvé if not results["success"]: if exit_code == 0: results["error"] = "Execution completed without error but no results found" self._add_log(f"[WARNING] {model_name} - {results['error']}") else: results["error"] = f"Execution error (code: {exit_code})" self._add_log(f"[ERROR] {model_name} - {results['error']}") except Exception as e: results["error"] = f"Exception: {str(e)}" self._add_log(f"[ERROR] Exception during evaluation of {model_name}: {str(e)}") finally: # Delete temporary file try: os.unlink(temp_file_path) except: pass return results async def _run_evaluations(self) -> List[Dict[str, Any]]: """ Run evaluations for all models Returns: List of evaluation results """ self._add_log(f"[INFO] Starting evaluations for {len(self.models)} models") # Create tasks for each model tasks = [self._evaluate_model(model) for model in self.models] # Run all tasks concurrently and gather results model_results = await asyncio.gather(*tasks, return_exceptions=True) # Process results results = [] for i, result in enumerate(model_results): if isinstance(result, Exception): # Handle exception model_name, provider = self.models[i] self._add_log(f"[ERROR] Evaluation failed for {model_name}: {str(result)}") results.append({ "model_name": model_name, "provider": provider, "success": False, "error": str(result), "results": None, "return_code": None }) else: # Valid result results.append(result) return results def _format_comparison_results(self, results: List[Dict[str, Any]]) -> Dict[str, Any]: """ Format results for easy comparison between models Args: results: List of evaluation results Returns: Dictionary with formatted comparison results """ comparison = { "metadata": { "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "dataset": f"{self.organization}/{self.dataset_name}", "total_models_tested": len(results), "successful_tests": len([r for r in results if r["success"]]) }, "models_comparison": [] } # Liste des modèles réussis et des modèles échoués successful_models = [r for r in results if r["success"]] failed_models = [r for r in results if not r["success"]] # Trier les modèles réussis par précision (du plus précis au moins précis) if successful_models: sorted_successful = sorted( successful_models, key=lambda x: x["results"]["results"]["all"]["accuracy"], reverse=True # Du plus grand au plus petit ) else: sorted_successful = [] # Trier les modèles échoués par nom sorted_failed = sorted(failed_models, key=lambda x: x["model_name"]) # Concaténer: d'abord les réussites, puis les échecs sorted_results = sorted_successful + sorted_failed # Créer l'entrée pour chaque modèle for result in sorted_results: model_result = { "model_name": result["model_name"], "provider": result["provider"], "success": result["success"] } if result["success"]: # Ajouter les métriques de précision et temps d'exécution model_result.update({ "accuracy": result["results"]["results"]["all"]["accuracy"], "accuracy_stderr": result["results"]["results"]["all"]["accuracy_stderr"], "evaluation_time": float(result["results"]["config_general"]["total_evaluation_time_secondes"]) }) else: # Ajouter l'erreur model_result["error"] = result.get("error", "Unknown reason") comparison["models_comparison"].append(model_result) return comparison async def _upload_results_to_dataset(self, comparison_results: Dict[str, Any]) -> bool: """ Upload evaluation results to the HuggingFace dataset Args: comparison_results: The formatted comparison results Returns: bool: True if upload succeeded, False otherwise """ try: # Create a timestamp for the results file timestamp = time.strftime("%Y%m%d_%H%M%S") result_filename = f"lighteval_results.json" # Create temporary file for upload temp_file_path = tempfile.mktemp(suffix=".json") with open(temp_file_path, 'w') as f: json.dump(comparison_results, f, indent=2) # Initialize HF API hf_token = os.getenv("HF_TOKEN") if not hf_token: self._add_log("[ERROR] HF_TOKEN not found, cannot upload results to dataset") return False api = HfApi(token=hf_token) dataset_id = f"{self.organization}/{self.dataset_name}" # Prepare the file operation operation = CommitOperationAdd( path_in_repo=f"lighteval_results/{result_filename}", path_or_fileobj=temp_file_path ) # Upload the file self._add_log(f"[INFO] Uploading results to dataset {dataset_id}") api.create_commit( repo_id=dataset_id, repo_type="dataset", operations=[operation], commit_message=f"Add evaluation results from {timestamp}" ) # Cleanup temporary file os.unlink(temp_file_path) self._add_log(f"[SUCCESS] Results uploaded to dataset {dataset_id} at lighteval_results/{result_filename}") return True except Exception as e: self._add_log(f"[ERROR] Failed to upload results to dataset: {str(e)}") return False async def _process_evaluation_results(self, results: List[Dict[str, Any]]) -> None: """ Process evaluation results, create summaries and save files Args: results: List of evaluation results """ if results: try: # Save detailed results detailed_output_file = f"{self.output_dir}/detailed_results.json" os.makedirs(os.path.dirname(detailed_output_file), exist_ok=True) with open(detailed_output_file, 'w') as f: json.dump(results, f, indent=2) self._add_log(f"[INFO] Detailed results saved in {detailed_output_file}") # Generate and save comparison results comparison = self._format_comparison_results(results) comparison_file = f"{self.output_dir}/models_comparison.json" with open(comparison_file, 'w') as f: json.dump(comparison, f, indent=2) self._add_log(f"[INFO] Models comparison saved in {comparison_file}") # Upload results to the dataset await self._upload_results_to_dataset(comparison) # Store results for later access self.results = comparison self._add_log("[SUCCESS] Evaluation completed") except Exception as e: self._add_log(f"[ERROR] Error during evaluation execution: {str(e)}") finally: self.is_completed = True def _async_run(self) -> None: """ Run the evaluation asynchronously """ async def run_async(): try: # Run evaluations results = await self._run_evaluations() # Process evaluation results await self._process_evaluation_results(results) except Exception as e: self._add_log(f"[ERROR] Error during evaluation execution: {str(e)}") finally: self.is_completed = True # Create and run the asyncio event loop loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(run_async()) loop.close() def run(self) -> None: """ Run the evaluation task in a separate thread """ self._add_log("[INFO] Starting evaluation") # Run in a separate thread to not block the main thread thread = threading.Thread(target=self._async_run) thread.daemon = True thread.start()