demo / backend /tasks /evaluationTask.py
tfrere's picture
update dockerfile
f8ec36f
raw
history blame
7.22 kB
"""
Task to run evaluation using lighteval
"""
import os
import time
import subprocess
import tempfile
from pathlib import Path
import concurrent.futures
from dotenv import load_dotenv
from datetime import datetime
import json
from typing import List, Dict
from tasks.get_model_providers import get_model_providers
from huggingface_hub import HfApi
class EvaluationTask:
"""
Task to run evaluation using lighteval
"""
def __init__(self, session_uid: str, dataset_name: str):
"""
Initialize the evaluation task
Args:
session_uid: Session ID for this task
dataset_name: Name of the dataset to evaluate
"""
self.session_uid = session_uid
self.dataset_name = dataset_name
self.is_completed = False
self.results = []
self.hf_api = HfApi()
def _save_results_to_hub(self) -> None:
"""
Save evaluation results to the dataset on the Hub
"""
try:
# Create results directory if it doesn't exist
results_dir = Path("data/lighteval_results")
results_dir.mkdir(parents=True, exist_ok=True)
# Save results to JSON file
results_file = results_dir / "lighteval_results.json"
with open(results_file, "w") as f:
json.dump(self.results, f, indent=2)
# Push to Hub
self.hf_api.upload_file(
path_or_fileobj=str(results_file),
path_in_repo="lighteval_results.json",
repo_id=self.dataset_name,
repo_type="dataset",
commit_message="Add lighteval evaluation results"
)
print(f"[{datetime.now().strftime('%H:%M:%S')}] Results saved to Hub at {self.dataset_name}/lighteval_results.json")
except Exception as e:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Failed to save results to Hub: {str(e)}")
def _run_lighteval(self, model_name: str, provider: str, dataset_name: str) -> dict:
start_time = time.time()
print(f"[{datetime.now().strftime('%H:%M:%S')}] Starting evaluation with {provider} provider for {model_name}")
# Create temporary task file
temp_file_path = tempfile.mktemp(suffix=".py")
with open(temp_file_path, 'w') as temp_file:
temp_file.write(f"""
from lighteval_task.lighteval_task import create_yourbench_task
# Create yourbench task
yourbench = create_yourbench_task("{dataset_name}", "multi_hop_questions")
# Define TASKS_TABLE needed by lighteval
TASKS_TABLE = [yourbench]
""")
# LightEval command
cmd_args = [
"lighteval",
"endpoint",
"inference-providers",
f"model={model_name},provider={provider}",
"custom|yourbench|0|0",
"--custom-tasks",
temp_file_path,
"--max-samples", "30",
"--output-dir", "data/lighteval_results",
# "--save-details",
"--no-push-to-hub"
]
try:
# Run the command with environment variables and timeout of 60 seconds
subprocess.run(cmd_args, env=os.environ, timeout=60)
except subprocess.TimeoutExpired:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Evaluation timed out for {model_name} after {time.time() - start_time:.2f}s")
return {
"model": model_name,
"provider": provider,
"accuracy": 0.0,
"execution_time": 60.0,
"status": "timeout"
}
# Calculate execution time
execution_time = time.time() - start_time
print(f"[{datetime.now().strftime('%H:%M:%S')}] Finished evaluation for {model_name} in {execution_time:.2f}s")
# Clean up
os.unlink(temp_file_path)
try:
# Get results from the output file
results_dir = Path("data/lighteval_results/results") / model_name.replace("/", "/")
results_file = next(results_dir.glob("results_*.json"))
with open(results_file) as f:
results = json.load(f)
accuracy = results["results"]["all"]["accuracy"]
return {
"model": model_name,
"provider": provider,
"accuracy": accuracy,
"execution_time": execution_time,
"status": "success"
}
except Exception as e:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Failed to parse results for {model_name} after {execution_time:.2f}s: {str(e)}")
return {
"model": model_name,
"provider": provider,
"accuracy": 0.0,
"execution_time": execution_time,
"status": "parse_error"
}
def run_parallel(self) -> List[Dict]:
"""
Run the evaluation task with multiple models in parallel using ProcessPoolExecutor
Returns:
List of results for each model
"""
# Start global timer
script_start_time = time.time()
# Load environment variables
load_dotenv()
# Models to evaluate
models = [
"Qwen/QwQ-32B",
"Qwen/Qwen2.5-72B-Instruct",
"deepseek-ai/DeepSeek-V3-0324",
"deepseek-ai/DeepSeek-R1-Distill-Llama-70B",
]
# Get providers for each model
model_providers = get_model_providers(models)
print(f"[{datetime.now().strftime('%H:%M:%S')}] Starting parallel evaluations")
# Run evaluations in parallel using ProcessPoolExecutor
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = [
executor.submit(self._run_lighteval, model_name, providers[0], self.dataset_name)
for model_name, providers in model_providers
if providers # Only run if providers are available
]
self.results = [future.result() for future in concurrent.futures.as_completed(futures)]
# Calculate total script execution time
total_time = time.time() - script_start_time
print(f"[{datetime.now().strftime('%H:%M:%S')}] All evaluations completed in {total_time:.2f}s")
# Save results to Hub
self._save_results_to_hub()
# Mark the task as completed
self.is_completed = True
return self.results
def get_logs(self) -> List[str]:
"""
Get logs for this task (empty list since we don't track logs anymore)
Returns:
Empty list of logs
"""
return []
def is_task_completed(self) -> bool:
"""
Check if the task is completed
Returns:
True if completed, False otherwise
"""
return self.is_completed
def run(self) -> None:
"""
Run the evaluation task (wrapper around run_parallel)
"""
self.run_parallel()