Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| """ | |
| Task to run evaluation using lighteval | |
| """ | |
| import os | |
| import time | |
| import subprocess | |
| import tempfile | |
| from pathlib import Path | |
| import concurrent.futures | |
| from dotenv import load_dotenv | |
| from datetime import datetime | |
| import json | |
| import shutil | |
| from typing import List, Dict | |
| from tasks.get_available_model_provider import get_available_model_provider | |
| from huggingface_hub import HfApi | |
| import asyncio | |
| # Valeur par défaut du timeout | |
| DEFAULT_EVALUATION_TIMEOUT = 120.0 # 1 minute par défaut | |
| class EvaluationTask: | |
| """ | |
| Task to run evaluation using lighteval | |
| """ | |
| def __init__(self, session_uid: str, dataset_name: str, clean_old_results: bool = False, timeout: float = None): | |
| """ | |
| Initialize the evaluation task | |
| Args: | |
| session_uid: Session ID for this task | |
| dataset_name: Name of the dataset to evaluate | |
| clean_old_results: If True, clean old results before evaluation | |
| timeout: Timeout in seconds for each model evaluation (if None, uses default) | |
| """ | |
| self.session_uid = session_uid | |
| self.dataset_name = dataset_name | |
| self.is_completed = False | |
| self.results = [] | |
| self.hf_api = HfApi() | |
| self.timeout = timeout if timeout is not None else DEFAULT_EVALUATION_TIMEOUT | |
| self.current_step = "initializing" | |
| self.completed_steps = [] | |
| self.step_start_time = time.time() # Enregistrer le temps de début de l'étape actuelle | |
| # Nettoyer les anciens résultats si demandé | |
| if clean_old_results: | |
| self.clean_old_results() | |
| async def update_step(self, step: str) -> None: | |
| """ | |
| Update the current step and completed steps with a minimum delay of 1 second | |
| Args: | |
| step: Name of the step to update | |
| """ | |
| # Calculer le temps écoulé depuis le début de l'étape précédente | |
| elapsed_since_step_start = time.time() - self.step_start_time | |
| # Si moins d'une seconde s'est écoulée, attendre pour compléter la seconde | |
| if elapsed_since_step_start < 1.0: | |
| await asyncio.sleep(1.0 - elapsed_since_step_start) | |
| # Mettre à jour l'étape courante et enregistrer le nouvel horodatage | |
| self.current_step = step | |
| self.step_start_time = time.time() | |
| # Ajouter aux étapes complétées si nécessaire | |
| if step not in self.completed_steps: | |
| self.completed_steps.append(step) | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] Step changed to: {step}") | |
| def get_progress(self) -> Dict: | |
| """ | |
| Get the current progress of the task | |
| Returns: | |
| Dictionary containing current step and completed steps | |
| """ | |
| return { | |
| "current_step": self.current_step, | |
| "completed_steps": self.completed_steps | |
| } | |
| def clean_old_results(self) -> None: | |
| """ | |
| Clean old evaluation results to avoid confusion | |
| """ | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] Checking and cleaning old results...") | |
| # Path to LightEval results | |
| results_dir = Path(f"uploaded_files/{self.session_uid}/lighteval_results") | |
| # Delete if exists | |
| if results_dir.exists(): | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] Deleting old LightEval results") | |
| shutil.rmtree(results_dir) | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] Cleaning complete") | |
| else: | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] No old results found") | |
| # Also check for intermediate lighteval results | |
| if os.path.exists("data/lighteval_results"): | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] Cleaning intermediate results") | |
| try: | |
| shutil.rmtree("data/lighteval_results", ignore_errors=True) | |
| except Exception as e: | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] Error cleaning intermediate results: {str(e)}") | |
| def _save_results_to_hub(self) -> None: | |
| """ | |
| Save evaluation results directly to the dataset on the Hub without persisting locally | |
| """ | |
| try: | |
| # Trier les résultats par précision (du plus précis au moins précis) | |
| sorted_results = sorted(self.results, key=lambda x: x.get('accuracy', 0), reverse=True) | |
| # Créer un fichier temporaire pour les résultats | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as temp_file: | |
| # Ajouter metadata aux résultats | |
| final_results = { | |
| "metadata": { | |
| "evaluation_date": datetime.now().isoformat(), | |
| "session_id": self.session_uid, | |
| "dataset_name": self.dataset_name | |
| }, | |
| "results": sorted_results | |
| } | |
| json.dump(final_results, temp_file, indent=2) | |
| temp_file_path = temp_file.name | |
| # Push to Hub | |
| self.hf_api.upload_file( | |
| path_or_fileobj=temp_file_path, | |
| path_in_repo="lighteval_results.json", | |
| repo_id=self.dataset_name, | |
| repo_type="dataset", | |
| commit_message="Add lighteval evaluation results" | |
| ) | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] Results saved to Hub at {self.dataset_name}/lighteval_results.json") | |
| # Supprimer le fichier temporaire | |
| os.unlink(temp_file_path) | |
| except Exception as e: | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] Failed to save results to Hub: {str(e)}") | |
| async def _run_lighteval(self, model_name: str, provider: str) -> dict: | |
| start_time = time.time() | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] Starting evaluation with {provider} provider for {model_name}") | |
| # Create temporary task file | |
| temp_file_path = tempfile.mktemp(suffix=".py") | |
| with open(temp_file_path, 'w') as temp_file: | |
| temp_file.write(f""" | |
| from lighteval_task.lighteval_task import create_yourbench_task | |
| # Create yourbench task | |
| yourbench = create_yourbench_task("{self.dataset_name}", "single_shot_questions") | |
| # Define TASKS_TABLE needed by lighteval | |
| TASKS_TABLE = [yourbench] | |
| """) | |
| # Create output directory in the session folder | |
| output_dir = f"uploaded_files/{self.session_uid}/lighteval_results" | |
| os.makedirs(output_dir, exist_ok=True) | |
| # LightEval command | |
| cmd_args = [ | |
| "lighteval", | |
| "endpoint", | |
| "inference-providers", | |
| f"model={model_name},provider={provider}", | |
| "custom|yourbench|0|0", | |
| "--custom-tasks", | |
| temp_file_path, | |
| "--max-samples", "30", | |
| "--output-dir", output_dir, | |
| "--save-details", | |
| "--no-push-to-hub" | |
| ] | |
| try: | |
| # Run the command with environment variables and increased timeout of 300 seconds | |
| process = await asyncio.create_subprocess_exec( | |
| *cmd_args, | |
| env=os.environ, | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE | |
| ) | |
| try: | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] Running command: {' '.join(cmd_args)}") | |
| stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=self.timeout) | |
| # Log stdout and stderr | |
| if stdout: | |
| stdout_decoded = stdout.decode('utf-8') | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] LightEval STDOUT for {model_name}:") | |
| for line in stdout_decoded.splitlines(): | |
| print(f"[STDOUT] {line}") | |
| if stderr: | |
| stderr_decoded = stderr.decode('utf-8') | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] LightEval STDERR for {model_name}:") | |
| for line in stderr_decoded.splitlines(): | |
| print(f"[STDERR] {line}") | |
| # Check return code | |
| if process.returncode != 0: | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] LightEval failed with return code {process.returncode}") | |
| except asyncio.TimeoutError: | |
| process.kill() | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] Evaluation timed out for {model_name} after {time.time() - start_time:.2f}s") | |
| # Clean up temporary files | |
| os.unlink(temp_file_path) | |
| return { | |
| "model": model_name, | |
| "provider": provider, | |
| "accuracy": 0.0, | |
| "execution_time": self.timeout, | |
| "status": "timeout" | |
| } | |
| except Exception as e: | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] Error running evaluation for {model_name}: {str(e)}") | |
| # Clean up temporary files | |
| os.unlink(temp_file_path) | |
| return { | |
| "model": model_name, | |
| "provider": provider, | |
| "accuracy": 0.0, | |
| "execution_time": time.time() - start_time, | |
| "status": "error" | |
| } | |
| # Calculate execution time | |
| execution_time = time.time() - start_time | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] Finished evaluation for {model_name} in {execution_time:.2f}s") | |
| try: | |
| # Get results from the output file | |
| results_dir = Path(output_dir) / "results" / model_name.replace("/", "/") | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] Looking for results in {results_dir}") | |
| if not results_dir.exists(): | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] Results directory doesn't exist for {model_name}") | |
| raise FileNotFoundError(f"Results directory not found: {results_dir}") | |
| results_files = list(results_dir.glob("results_*.json")) | |
| if not results_files: | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] No results files found in {results_dir}") | |
| raise FileNotFoundError(f"No results files found in {results_dir}") | |
| results_file = results_files[0] | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] Using results file: {results_file}") | |
| with open(results_file) as f: | |
| results = json.load(f) | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] Results structure: {json.dumps(list(results.keys()))}") | |
| # Vérifier que la structure est celle attendue | |
| if "results" in results and "all" in results["results"] and "accuracy" in results["results"]["all"]: | |
| accuracy = results["results"]["all"]["accuracy"] | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] Extracted accuracy: {accuracy}") | |
| else: | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] Structure de résultats inattendue. Clés disponibles: {list(results.keys())}") | |
| if "results" in results: | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] Clés dans 'results': {list(results['results'].keys()) if isinstance(results['results'], dict) else 'pas un dictionnaire'}") | |
| raise ValueError(f"Structure de résultats inattendue pour {model_name}") | |
| result_data = { | |
| "model": model_name, | |
| "provider": provider, | |
| "accuracy": accuracy, | |
| "execution_time": execution_time, | |
| "status": "success" | |
| } | |
| except Exception as e: | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] Failed to parse results for {model_name} after {execution_time:.2f}s: {str(e)}") | |
| result_data = { | |
| "model": model_name, | |
| "provider": provider, | |
| "accuracy": 0.0, | |
| "execution_time": execution_time, | |
| "status": "parse_error" | |
| } | |
| # Clean up temporary files | |
| os.unlink(temp_file_path) | |
| return result_data | |
| async def run(self, clean_first: bool = True) -> None: | |
| """ | |
| Run the evaluation task asynchronously | |
| Args: | |
| clean_first: If True, clean old results before starting (default: True) | |
| """ | |
| # Systematically clean old results before starting | |
| self.clean_old_results() | |
| # Start global timer | |
| script_start_time = time.time() | |
| # Load environment variables | |
| load_dotenv() | |
| # Models to evaluate - uniquement les modèles accessibles | |
| models = [ | |
| "Qwen/QwQ-32B", | |
| "Qwen/Qwen2.5-72B-Instruct", | |
| "Qwen/Qwen2.5-32B-Instruct", | |
| "meta-llama/Llama-3.1-8B-Instruct", | |
| "meta-llama/Llama-3.3-70B-Instruct", | |
| "deepseek-ai/DeepSeek-R1-Distill-Llama-70B", | |
| "mistralai/Mistral-Small-24B-Instruct-2501", | |
| ] | |
| # Log pour voir la structure du dataset | |
| try: | |
| from datasets import load_dataset | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] Tentative de chargement du dataset {self.dataset_name} pour inspection") | |
| dataset = load_dataset(self.dataset_name, "single_shot_questions", split="train") | |
| # Vérifier la structure du premier exemple | |
| if len(dataset) > 0: | |
| first_example = dataset[0] | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] Structure du premier exemple:") | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] Clés: {first_example.keys()}") | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] Citations: {first_example.get('citations', 'non trouvé')}") | |
| except Exception as e: | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] Erreur lors de l'inspection du dataset: {str(e)}") | |
| # Step 1: Check available providers for each model | |
| await self.update_step("finding_available_model_providers") | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] Checking available providers for models...") | |
| model_providers = {} | |
| for model in models: | |
| provider = get_available_model_provider(model, verbose=True) | |
| if provider: | |
| model_providers[model] = provider | |
| else: | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] No available provider found for {model}") | |
| if not model_providers: | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] No models with available providers found") | |
| return | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] Found providers for {len(model_providers)} models") | |
| # Step 2: Run evaluations in parallel | |
| await self.update_step("starting_evaluation_process") | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] Starting evaluation process...") | |
| # Step 3: Evaluate models | |
| await self.update_step("evaluating_models") | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] Evaluating models...") | |
| tasks = [] | |
| for model, provider in model_providers.items(): | |
| tasks.append(self._run_lighteval(model, provider)) | |
| # Run all evaluations in parallel | |
| results = await asyncio.gather(*tasks) | |
| # Filter out failed evaluations | |
| self.results = [r for r in results if r["status"] == "success"] | |
| # Step 4: Save results | |
| await self.update_step("storing_evaluation_results") | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] Storing evaluation results...") | |
| self._save_results_to_hub() | |
| # Mark task as completed | |
| self.is_completed = True | |
| await self.update_step("completed") | |
| total_time = time.time() - script_start_time | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] Evaluation completed in {total_time:.2f}s") | |
| def get_logs(self) -> List[str]: | |
| """ | |
| Get the logs of the task | |
| Returns: | |
| List of log messages | |
| """ | |
| return self.logs if hasattr(self, "logs") else [] | |
| def is_task_completed(self) -> bool: | |
| """ | |
| Check if the task is completed | |
| Returns: | |
| True if the task is completed, False otherwise | |
| """ | |
| return self.is_completed |