""" Task to run evaluation using lighteval """ import os import time import subprocess import tempfile from pathlib import Path import concurrent.futures from dotenv import load_dotenv from datetime import datetime import json from typing import List, Dict from tasks.get_model_providers import get_model_providers from huggingface_hub import HfApi class EvaluationTask: """ Task to run evaluation using lighteval """ def __init__(self, session_uid: str, dataset_name: str): """ Initialize the evaluation task Args: session_uid: Session ID for this task dataset_name: Name of the dataset to evaluate """ self.session_uid = session_uid self.dataset_name = dataset_name self.is_completed = False self.results = [] self.hf_api = HfApi() def _save_results_to_hub(self) -> None: """ Save evaluation results to the dataset on the Hub """ try: # Create results directory if it doesn't exist results_dir = Path("data/lighteval_results") results_dir.mkdir(parents=True, exist_ok=True) # Save results to JSON file results_file = results_dir / "lighteval_results.json" with open(results_file, "w") as f: json.dump(self.results, f, indent=2) # Push to Hub self.hf_api.upload_file( path_or_fileobj=str(results_file), path_in_repo="lighteval_results.json", repo_id=self.dataset_name, repo_type="dataset", commit_message="Add lighteval evaluation results" ) print(f"[{datetime.now().strftime('%H:%M:%S')}] Results saved to Hub at {self.dataset_name}/lighteval_results.json") except Exception as e: print(f"[{datetime.now().strftime('%H:%M:%S')}] Failed to save results to Hub: {str(e)}") def _run_lighteval(self, model_name: str, provider: str, dataset_name: str) -> dict: start_time = time.time() print(f"[{datetime.now().strftime('%H:%M:%S')}] Starting evaluation with {provider} provider for {model_name}") # Create temporary task file temp_file_path = tempfile.mktemp(suffix=".py") with open(temp_file_path, 'w') as temp_file: temp_file.write(f""" from lighteval_task.lighteval_task import create_yourbench_task # Create yourbench task yourbench = create_yourbench_task("{dataset_name}", "multi_hop_questions") # Define TASKS_TABLE needed by lighteval TASKS_TABLE = [yourbench] """) # LightEval command cmd_args = [ "lighteval", "endpoint", "inference-providers", f"model={model_name},provider={provider}", "custom|yourbench|0|0", "--custom-tasks", temp_file_path, "--max-samples", "30", "--output-dir", "data/lighteval_results", # "--save-details", "--no-push-to-hub" ] try: # Run the command with environment variables and timeout of 60 seconds subprocess.run(cmd_args, env=os.environ, timeout=60) except subprocess.TimeoutExpired: print(f"[{datetime.now().strftime('%H:%M:%S')}] Evaluation timed out for {model_name} after {time.time() - start_time:.2f}s") return { "model": model_name, "provider": provider, "accuracy": 0.0, "execution_time": 60.0, "status": "timeout" } # Calculate execution time execution_time = time.time() - start_time print(f"[{datetime.now().strftime('%H:%M:%S')}] Finished evaluation for {model_name} in {execution_time:.2f}s") # Clean up os.unlink(temp_file_path) try: # Get results from the output file results_dir = Path("data/lighteval_results/results") / model_name.replace("/", "/") results_file = next(results_dir.glob("results_*.json")) with open(results_file) as f: results = json.load(f) accuracy = results["results"]["all"]["accuracy"] return { "model": model_name, "provider": provider, "accuracy": accuracy, "execution_time": execution_time, "status": "success" } except Exception as e: print(f"[{datetime.now().strftime('%H:%M:%S')}] Failed to parse results for {model_name} after {execution_time:.2f}s: {str(e)}") return { "model": model_name, "provider": provider, "accuracy": 0.0, "execution_time": execution_time, "status": "parse_error" } def run_parallel(self) -> List[Dict]: """ Run the evaluation task with multiple models in parallel using ProcessPoolExecutor Returns: List of results for each model """ # Start global timer script_start_time = time.time() # Load environment variables load_dotenv() # Models to evaluate models = [ "Qwen/QwQ-32B", "Qwen/Qwen2.5-72B-Instruct", "deepseek-ai/DeepSeek-V3-0324", "deepseek-ai/DeepSeek-R1-Distill-Llama-70B", ] # Get providers for each model model_providers = get_model_providers(models) print(f"[{datetime.now().strftime('%H:%M:%S')}] Starting parallel evaluations") # Run evaluations in parallel using ProcessPoolExecutor with concurrent.futures.ProcessPoolExecutor() as executor: futures = [ executor.submit(self._run_lighteval, model_name, providers[0], self.dataset_name) for model_name, providers in model_providers if providers # Only run if providers are available ] self.results = [future.result() for future in concurrent.futures.as_completed(futures)] # Calculate total script execution time total_time = time.time() - script_start_time print(f"[{datetime.now().strftime('%H:%M:%S')}] All evaluations completed in {total_time:.2f}s") # Save results to Hub self._save_results_to_hub() # Mark the task as completed self.is_completed = True return self.results def get_logs(self) -> List[str]: """ Get logs for this task (empty list since we don't track logs anymore) Returns: Empty list of logs """ return [] def is_task_completed(self) -> bool: """ Check if the task is completed Returns: True if completed, False otherwise """ return self.is_completed def run(self) -> None: """ Run the evaluation task (wrapper around run_parallel) """ self.run_parallel()