Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
""" | |
Task to run evaluation using lighteval | |
""" | |
import os | |
import time | |
import subprocess | |
import tempfile | |
from pathlib import Path | |
import concurrent.futures | |
from dotenv import load_dotenv | |
from datetime import datetime | |
import json | |
from typing import List, Dict | |
from tasks.get_model_providers import get_model_providers | |
from huggingface_hub import HfApi | |
class EvaluationTask: | |
""" | |
Task to run evaluation using lighteval | |
""" | |
def __init__(self, session_uid: str, dataset_name: str): | |
""" | |
Initialize the evaluation task | |
Args: | |
session_uid: Session ID for this task | |
dataset_name: Name of the dataset to evaluate | |
""" | |
self.session_uid = session_uid | |
self.dataset_name = dataset_name | |
self.is_completed = False | |
self.results = [] | |
self.hf_api = HfApi() | |
def _save_results_to_hub(self) -> None: | |
""" | |
Save evaluation results to the dataset on the Hub | |
""" | |
try: | |
# Create results directory if it doesn't exist | |
results_dir = Path("data/lighteval_results") | |
results_dir.mkdir(parents=True, exist_ok=True) | |
# Save results to JSON file | |
results_file = results_dir / "lighteval_results.json" | |
with open(results_file, "w") as f: | |
json.dump(self.results, f, indent=2) | |
# Push to Hub | |
self.hf_api.upload_file( | |
path_or_fileobj=str(results_file), | |
path_in_repo="lighteval_results.json", | |
repo_id=self.dataset_name, | |
repo_type="dataset", | |
commit_message="Add lighteval evaluation results" | |
) | |
print(f"[{datetime.now().strftime('%H:%M:%S')}] Results saved to Hub at {self.dataset_name}/lighteval_results.json") | |
except Exception as e: | |
print(f"[{datetime.now().strftime('%H:%M:%S')}] Failed to save results to Hub: {str(e)}") | |
def _run_lighteval(self, model_name: str, provider: str, dataset_name: str) -> dict: | |
start_time = time.time() | |
print(f"[{datetime.now().strftime('%H:%M:%S')}] Starting evaluation with {provider} provider for {model_name}") | |
# Create temporary task file | |
temp_file_path = tempfile.mktemp(suffix=".py") | |
with open(temp_file_path, 'w') as temp_file: | |
temp_file.write(f""" | |
from lighteval_task.lighteval_task import create_yourbench_task | |
# Create yourbench task | |
yourbench = create_yourbench_task("{dataset_name}", "multi_hop_questions") | |
# Define TASKS_TABLE needed by lighteval | |
TASKS_TABLE = [yourbench] | |
""") | |
# LightEval command | |
cmd_args = [ | |
"lighteval", | |
"endpoint", | |
"inference-providers", | |
f"model={model_name},provider={provider}", | |
"custom|yourbench|0|0", | |
"--custom-tasks", | |
temp_file_path, | |
"--max-samples", "30", | |
"--output-dir", "data/lighteval_results", | |
# "--save-details", | |
"--no-push-to-hub" | |
] | |
try: | |
# Run the command with environment variables and timeout of 60 seconds | |
subprocess.run(cmd_args, env=os.environ, timeout=60) | |
except subprocess.TimeoutExpired: | |
print(f"[{datetime.now().strftime('%H:%M:%S')}] Evaluation timed out for {model_name} after {time.time() - start_time:.2f}s") | |
return { | |
"model": model_name, | |
"provider": provider, | |
"accuracy": 0.0, | |
"execution_time": 60.0, | |
"status": "timeout" | |
} | |
# Calculate execution time | |
execution_time = time.time() - start_time | |
print(f"[{datetime.now().strftime('%H:%M:%S')}] Finished evaluation for {model_name} in {execution_time:.2f}s") | |
# Clean up | |
os.unlink(temp_file_path) | |
try: | |
# Get results from the output file | |
results_dir = Path("data/lighteval_results/results") / model_name.replace("/", "/") | |
results_file = next(results_dir.glob("results_*.json")) | |
with open(results_file) as f: | |
results = json.load(f) | |
accuracy = results["results"]["all"]["accuracy"] | |
return { | |
"model": model_name, | |
"provider": provider, | |
"accuracy": accuracy, | |
"execution_time": execution_time, | |
"status": "success" | |
} | |
except Exception as e: | |
print(f"[{datetime.now().strftime('%H:%M:%S')}] Failed to parse results for {model_name} after {execution_time:.2f}s: {str(e)}") | |
return { | |
"model": model_name, | |
"provider": provider, | |
"accuracy": 0.0, | |
"execution_time": execution_time, | |
"status": "parse_error" | |
} | |
def run_parallel(self) -> List[Dict]: | |
""" | |
Run the evaluation task with multiple models in parallel using ProcessPoolExecutor | |
Returns: | |
List of results for each model | |
""" | |
# Start global timer | |
script_start_time = time.time() | |
# Load environment variables | |
load_dotenv() | |
# Models to evaluate | |
models = [ | |
"Qwen/QwQ-32B", | |
"Qwen/Qwen2.5-72B-Instruct", | |
"deepseek-ai/DeepSeek-V3-0324", | |
"deepseek-ai/DeepSeek-R1-Distill-Llama-70B", | |
] | |
# Get providers for each model | |
model_providers = get_model_providers(models) | |
print(f"[{datetime.now().strftime('%H:%M:%S')}] Starting parallel evaluations") | |
# Run evaluations in parallel using ProcessPoolExecutor | |
with concurrent.futures.ProcessPoolExecutor() as executor: | |
futures = [ | |
executor.submit(self._run_lighteval, model_name, providers[0], self.dataset_name) | |
for model_name, providers in model_providers | |
if providers # Only run if providers are available | |
] | |
self.results = [future.result() for future in concurrent.futures.as_completed(futures)] | |
# Calculate total script execution time | |
total_time = time.time() - script_start_time | |
print(f"[{datetime.now().strftime('%H:%M:%S')}] All evaluations completed in {total_time:.2f}s") | |
# Save results to Hub | |
self._save_results_to_hub() | |
# Mark the task as completed | |
self.is_completed = True | |
return self.results | |
def get_logs(self) -> List[str]: | |
""" | |
Get logs for this task (empty list since we don't track logs anymore) | |
Returns: | |
Empty list of logs | |
""" | |
return [] | |
def is_task_completed(self) -> bool: | |
""" | |
Check if the task is completed | |
Returns: | |
True if completed, False otherwise | |
""" | |
return self.is_completed | |
def run(self) -> None: | |
""" | |
Run the evaluation task (wrapper around run_parallel) | |
""" | |
self.run_parallel() |