Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
""" | |
Task to evaluate models on a YourbBench dataset using LightEval | |
""" | |
import os | |
import sys | |
import json | |
import time | |
import tempfile | |
import asyncio | |
import threading | |
from pathlib import Path | |
from typing import Optional, List, Dict, Any, Tuple | |
from loguru import logger | |
from huggingface_hub import HfApi, CommitOperationAdd | |
from tasks.yourbench_lighteval_task import create_yourbench_task | |
class EvaluationTask: | |
""" | |
Task to evaluate models using LightEval on a YourbBench dataset | |
""" | |
def __init__(self, session_uid: str, dataset_name: str): | |
""" | |
Initialize the evaluation task | |
Args: | |
session_uid: Session ID for this task | |
dataset_name: Name of the dataset to evaluate | |
""" | |
self.session_uid = session_uid | |
self.dataset_name = dataset_name | |
self.logs: List[str] = [] | |
self.is_completed = False | |
self.organization = os.getenv("HF_ORGANIZATION", "yourbench") | |
self.results: Dict[str, Any] = {} | |
self.output_dir = f"uploaded_files/{session_uid}/lighteval_results" | |
# Models to evaluate - can be modified to allow customization | |
self.models = [ | |
("Qwen/Qwen2.5-72B-Instruct", "novita"), | |
("Qwen/QwQ-32B", "novita"), | |
] | |
self._add_log("[INFO] Initializing evaluation task") | |
self._add_log(f"[INFO] Dataset to evaluate: {self.organization}/{dataset_name}") | |
self._add_log(f"[INFO] Output directory: {self.output_dir}") | |
def _add_log(self, message: str) -> None: | |
""" | |
Add a log message to the logs list | |
Args: | |
message: Log message to add | |
""" | |
if message not in self.logs: # Avoid duplicates | |
self.logs.append(message) | |
# Force copy of the list to avoid reference problems | |
self.logs = self.logs.copy() | |
# Record in system logs | |
logger.info(f"[{self.session_uid}] {message}") | |
def get_logs(self) -> List[str]: | |
""" | |
Get all logs for this task | |
Returns: | |
List of log messages | |
""" | |
return self.logs.copy() # Retourner une copie pour éviter les problèmes de référence | |
def is_task_completed(self) -> bool: | |
""" | |
Check if the task is completed | |
Returns: | |
True if completed, False otherwise | |
""" | |
return self.is_completed | |
async def _evaluate_model(self, model_info: Tuple[str, str]) -> Dict[str, Any]: | |
""" | |
Evaluate a specific model | |
Args: | |
model_info: Tuple of (model_name, provider) | |
Returns: | |
Dictionary with evaluation results | |
""" | |
model_name, provider = model_info | |
self._add_log(f"[INFO] Starting evaluation for {model_name} with {provider}") | |
# Create output directory | |
os.makedirs(self.output_dir, exist_ok=True) | |
# Define full dataset path | |
dataset_path = f"{self.organization}/{self.dataset_name}" | |
# Create temporary file | |
temp_file_path = tempfile.mktemp(suffix=".py") | |
self._add_log(f"[INFO] Creating temporary file for {model_name}: {temp_file_path}") | |
with open(temp_file_path, 'w') as temp_file: | |
temp_file.write(f""" | |
import os | |
import sys | |
sys.path.append("{os.getcwd()}") | |
from tasks.yourbench_lighteval_task import create_yourbench_task | |
# Create yourbench task | |
yourbench = create_yourbench_task("{dataset_path}", "lighteval") | |
# Define TASKS_TABLE needed by lighteval | |
TASKS_TABLE = [yourbench] | |
""") | |
# Build lighteval command args | |
cmd_args = [ | |
"lighteval", | |
"endpoint", | |
"inference-providers", | |
f"model={model_name},provider={provider}", | |
"custom|yourbench|0|0", | |
"--custom-tasks", | |
temp_file_path, | |
"--max-samples", "5", | |
"--output-dir", self.output_dir, | |
"--save-details", | |
"--no-push-to-hub" | |
] | |
self._add_log(f"[INFO] Running command for {model_name}: {' '.join(cmd_args)}") | |
results = { | |
"model_name": model_name, | |
"provider": provider, | |
"success": False, | |
"error": None, | |
"results": None, | |
"return_code": None | |
} | |
try: | |
# Prepare environment with needed tokens | |
env = os.environ.copy() | |
hf_token = os.getenv("HF_TOKEN") | |
if hf_token: | |
env["HF_TOKEN"] = hf_token | |
env["HUGGING_FACE_HUB_TOKEN"] = hf_token | |
env["HF_ORGANIZATION"] = self.organization | |
# Run the process asynchronously | |
process = await asyncio.create_subprocess_exec( | |
*cmd_args, | |
stdout=asyncio.subprocess.PIPE, | |
stderr=asyncio.subprocess.PIPE, | |
env=env | |
) | |
# Wait for the process to complete | |
stdout, stderr = await process.communicate() | |
# Store return code | |
exit_code = process.returncode | |
results["return_code"] = exit_code | |
# Log output | |
if stdout: | |
stdout_lines = stdout.decode().strip().split('\n') | |
for line in stdout_lines[:5]: # Log only first 5 lines | |
self._add_log(f"[INFO] {model_name} - {line}") | |
# Log errors if any | |
if stderr and exit_code != 0: | |
stderr_lines = stderr.decode().strip().split('\n') | |
for line in stderr_lines[:5]: # Log only first 5 lines | |
self._add_log(f"[ERROR] {model_name} - {line}") | |
# Find any JSON result files - LightEval organizes by model name in different ways | |
result_files = [] | |
results_dir = Path(self.output_dir) / "results" | |
if results_dir.exists(): | |
# Parcourir récursivement tous les répertoires pour trouver des fichiers JSON | |
for json_file in results_dir.glob("**/*.json"): | |
# Check if the filename or path contains parts of the model name | |
model_parts = [ | |
model_name, # Full name | |
model_name.replace('/', '_'), # Name with / replaced by _ | |
model_name.split('/')[-1] # Just the model name without the organization | |
] | |
if any(part in str(json_file) for part in model_parts): | |
result_files.append(json_file) | |
# Traiter les fichiers de résultats trouvés | |
if result_files: | |
# Prendre le fichier le plus récent | |
result_files.sort(key=lambda x: x.stat().st_mtime, reverse=True) | |
latest_result = result_files[0] | |
self._add_log(f"[INFO] {model_name} - Found result file: {latest_result}") | |
try: | |
with open(latest_result, 'r') as f: | |
test_results = json.load(f) | |
# Vérifier si les résultats contiennent les informations essentielles | |
if (test_results and | |
isinstance(test_results, dict) and | |
"results" in test_results and | |
"all" in test_results["results"]): | |
# Enregistrer les résultats | |
results["results"] = test_results | |
results["success"] = True | |
# Afficher la précision | |
accuracy = test_results["results"]["all"]["accuracy"] | |
accuracy_stderr = test_results["results"]["all"]["accuracy_stderr"] | |
self._add_log(f"[SUCCESS] {model_name} - Accuracy: {accuracy:.4f} ± {accuracy_stderr:.4f}") | |
else: | |
results["error"] = "Incomplete or unexpected result format" | |
self._add_log(f"[WARNING] {model_name} - Unexpected result format") | |
except (json.JSONDecodeError, KeyError) as e: | |
results["error"] = f"Error reading results: {str(e)}" | |
self._add_log(f"[ERROR] {model_name} - {results['error']}") | |
# Si aucun résultat trouvé | |
if not results["success"]: | |
if exit_code == 0: | |
results["error"] = "Execution completed without error but no results found" | |
self._add_log(f"[WARNING] {model_name} - {results['error']}") | |
else: | |
results["error"] = f"Execution error (code: {exit_code})" | |
self._add_log(f"[ERROR] {model_name} - {results['error']}") | |
except Exception as e: | |
results["error"] = f"Exception: {str(e)}" | |
self._add_log(f"[ERROR] Exception during evaluation of {model_name}: {str(e)}") | |
finally: | |
# Delete temporary file | |
try: | |
os.unlink(temp_file_path) | |
except: | |
pass | |
return results | |
async def _run_evaluations(self) -> List[Dict[str, Any]]: | |
""" | |
Run evaluations for all models | |
Returns: | |
List of evaluation results | |
""" | |
self._add_log(f"[INFO] Starting evaluations for {len(self.models)} models") | |
# Create tasks for each model | |
tasks = [self._evaluate_model(model) for model in self.models] | |
# Run all tasks concurrently and gather results | |
model_results = await asyncio.gather(*tasks, return_exceptions=True) | |
# Process results | |
results = [] | |
for i, result in enumerate(model_results): | |
if isinstance(result, Exception): | |
# Handle exception | |
model_name, provider = self.models[i] | |
self._add_log(f"[ERROR] Evaluation failed for {model_name}: {str(result)}") | |
results.append({ | |
"model_name": model_name, | |
"provider": provider, | |
"success": False, | |
"error": str(result), | |
"results": None, | |
"return_code": None | |
}) | |
else: | |
# Valid result | |
results.append(result) | |
return results | |
def _format_comparison_results(self, results: List[Dict[str, Any]]) -> Dict[str, Any]: | |
""" | |
Format results for easy comparison between models | |
Args: | |
results: List of evaluation results | |
Returns: | |
Dictionary with formatted comparison results | |
""" | |
comparison = { | |
"metadata": { | |
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), | |
"dataset": f"{self.organization}/{self.dataset_name}", | |
"total_models_tested": len(results), | |
"successful_tests": len([r for r in results if r["success"]]) | |
}, | |
"models_comparison": [] | |
} | |
# Liste des modèles réussis et des modèles échoués | |
successful_models = [r for r in results if r["success"]] | |
failed_models = [r for r in results if not r["success"]] | |
# Trier les modèles réussis par précision (du plus précis au moins précis) | |
if successful_models: | |
sorted_successful = sorted( | |
successful_models, | |
key=lambda x: x["results"]["results"]["all"]["accuracy"], | |
reverse=True # Du plus grand au plus petit | |
) | |
else: | |
sorted_successful = [] | |
# Trier les modèles échoués par nom | |
sorted_failed = sorted(failed_models, key=lambda x: x["model_name"]) | |
# Concaténer: d'abord les réussites, puis les échecs | |
sorted_results = sorted_successful + sorted_failed | |
# Créer l'entrée pour chaque modèle | |
for result in sorted_results: | |
model_result = { | |
"model_name": result["model_name"], | |
"provider": result["provider"], | |
"success": result["success"] | |
} | |
if result["success"]: | |
# Ajouter les métriques de précision et temps d'exécution | |
model_result.update({ | |
"accuracy": result["results"]["results"]["all"]["accuracy"], | |
"accuracy_stderr": result["results"]["results"]["all"]["accuracy_stderr"], | |
"evaluation_time": float(result["results"]["config_general"]["total_evaluation_time_secondes"]) | |
}) | |
else: | |
# Ajouter l'erreur | |
model_result["error"] = result.get("error", "Unknown reason") | |
comparison["models_comparison"].append(model_result) | |
return comparison | |
async def _upload_results_to_dataset(self, comparison_results: Dict[str, Any]) -> bool: | |
""" | |
Upload evaluation results to the HuggingFace dataset | |
Args: | |
comparison_results: The formatted comparison results | |
Returns: | |
bool: True if upload succeeded, False otherwise | |
""" | |
try: | |
# Create a timestamp for the results file | |
timestamp = time.strftime("%Y%m%d_%H%M%S") | |
result_filename = f"lighteval_results.json" | |
# Create temporary file for upload | |
temp_file_path = tempfile.mktemp(suffix=".json") | |
with open(temp_file_path, 'w') as f: | |
json.dump(comparison_results, f, indent=2) | |
# Initialize HF API | |
hf_token = os.getenv("HF_TOKEN") | |
if not hf_token: | |
self._add_log("[ERROR] HF_TOKEN not found, cannot upload results to dataset") | |
return False | |
api = HfApi(token=hf_token) | |
dataset_id = f"{self.organization}/{self.dataset_name}" | |
# Prepare the file operation | |
operation = CommitOperationAdd( | |
path_in_repo=f"lighteval_results/{result_filename}", | |
path_or_fileobj=temp_file_path | |
) | |
# Upload the file | |
self._add_log(f"[INFO] Uploading results to dataset {dataset_id}") | |
api.create_commit( | |
repo_id=dataset_id, | |
repo_type="dataset", | |
operations=[operation], | |
commit_message=f"Add evaluation results from {timestamp}" | |
) | |
# Cleanup temporary file | |
os.unlink(temp_file_path) | |
self._add_log(f"[SUCCESS] Results uploaded to dataset {dataset_id} at lighteval_results/{result_filename}") | |
return True | |
except Exception as e: | |
self._add_log(f"[ERROR] Failed to upload results to dataset: {str(e)}") | |
return False | |
async def _process_evaluation_results(self, results: List[Dict[str, Any]]) -> None: | |
""" | |
Process evaluation results, create summaries and save files | |
Args: | |
results: List of evaluation results | |
""" | |
if results: | |
try: | |
# Save detailed results | |
detailed_output_file = f"{self.output_dir}/detailed_results.json" | |
os.makedirs(os.path.dirname(detailed_output_file), exist_ok=True) | |
with open(detailed_output_file, 'w') as f: | |
json.dump(results, f, indent=2) | |
self._add_log(f"[INFO] Detailed results saved in {detailed_output_file}") | |
# Generate and save comparison results | |
comparison = self._format_comparison_results(results) | |
comparison_file = f"{self.output_dir}/models_comparison.json" | |
with open(comparison_file, 'w') as f: | |
json.dump(comparison, f, indent=2) | |
self._add_log(f"[INFO] Models comparison saved in {comparison_file}") | |
# Upload results to the dataset | |
await self._upload_results_to_dataset(comparison) | |
# Store results for later access | |
self.results = comparison | |
self._add_log("[SUCCESS] Evaluation completed") | |
except Exception as e: | |
self._add_log(f"[ERROR] Error during evaluation execution: {str(e)}") | |
finally: | |
self.is_completed = True | |
def _async_run(self) -> None: | |
""" | |
Run the evaluation asynchronously | |
""" | |
async def run_async(): | |
try: | |
# Run evaluations | |
results = await self._run_evaluations() | |
# Process evaluation results | |
await self._process_evaluation_results(results) | |
except Exception as e: | |
self._add_log(f"[ERROR] Error during evaluation execution: {str(e)}") | |
finally: | |
self.is_completed = True | |
# Create and run the asyncio event loop | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
loop.run_until_complete(run_async()) | |
loop.close() | |
def run(self) -> None: | |
""" | |
Run the evaluation task in a separate thread | |
""" | |
self._add_log("[INFO] Starting evaluation") | |
# Run in a separate thread to not block the main thread | |
thread = threading.Thread(target=self._async_run) | |
thread.daemon = True | |
thread.start() | |