Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
#!/usr/bin/env python3 | |
""" | |
Script to test the evaluation task in standalone mode | |
""" | |
import os | |
import sys | |
import uuid | |
import json | |
import time | |
import argparse | |
from dotenv import load_dotenv | |
from pathlib import Path | |
import traceback | |
# Ensure the environment is properly configured | |
load_dotenv() | |
# Add the current directory to the path to import modules | |
sys.path.append(os.getcwd()) | |
from tasks.evaluationTask import EvaluationTask | |
def setup_environment(): | |
""" | |
Configure the environment for testing | |
""" | |
# Check if the HF token is defined | |
hf_token = os.getenv("HF_TOKEN") | |
if not hf_token: | |
print("β οΈ The HF_TOKEN is not defined in the environment or .env file") | |
print(" Please define this variable before continuing.") | |
sys.exit(1) | |
# Set the default organization if not defined | |
if not os.getenv("HF_ORGANIZATION"): | |
os.environ["HF_ORGANIZATION"] = "yourbench" | |
print("βΉοΈ The HF_ORGANIZATION variable is not defined, using 'yourbench' as default") | |
def run_standalone_evaluation(dataset_name, models=None, max_wait_time=3600): | |
""" | |
Run the evaluation task in standalone mode | |
Args: | |
dataset_name: Name of the dataset to evaluate | |
models: List of models to evaluate (optional) | |
max_wait_time: Maximum waiting time in seconds | |
""" | |
# Generate a unique session ID | |
session_uid = str(uuid.uuid4()) | |
print(f"π§ Session ID: {session_uid}") | |
# Create the evaluation task instance | |
evaluation_task = EvaluationTask(session_uid, dataset_name) | |
# If specific models are provided, use them | |
if models: | |
evaluation_task.models = models | |
print(f"π€ Using custom models: {models}") | |
# Display dataset information | |
organization = os.getenv("HF_ORGANIZATION", "yourbench") | |
print(f"π Evaluating dataset: {organization}/{dataset_name}") | |
print(f"πΎ Results saved in: {evaluation_task.output_dir}") | |
# Start the evaluation task | |
print("π Starting evaluation...") | |
evaluation_task.run() | |
# Wait for the task to complete while displaying logs | |
start_time = time.time() | |
last_log_count = 0 | |
while not evaluation_task.is_task_completed(): | |
current_logs = evaluation_task.get_logs() | |
# Display only new logs | |
if len(current_logs) > last_log_count: | |
for log in current_logs[last_log_count:]: | |
print(f" {log}") | |
last_log_count = len(current_logs) | |
# Check if the maximum time is reached | |
elapsed_time = time.time() - start_time | |
if elapsed_time > max_wait_time: | |
print("β οΈ Maximum waiting time reached, forced stop") | |
break | |
time.sleep(1) | |
# Check if results are available | |
results_file = Path(f"{evaluation_task.output_dir}/models_comparison.json") | |
if results_file.exists(): | |
try: | |
with open(results_file, 'r') as f: | |
results = json.load(f) | |
print("\nπ Evaluation Results:") | |
print(f" Dataset: {results['metadata']['dataset']}") | |
print(f" Models tested: {results['metadata']['total_models_tested']}") | |
print(f" Successful tests: {results['metadata']['successful_tests']}") | |
print(f" Timestamp: {results['metadata']['timestamp']}") | |
if results['metadata']['successful_tests'] > 0: | |
print("\nπ Model ranking by accuracy:") | |
successful_models = [m for m in results['models_comparison'] if m['success']] | |
for i, model in enumerate(successful_models): | |
print(f" {i+1}. β {model['model_name']} ({model['provider']})") | |
print(f" Accuracy: {model['accuracy']:.4f} Β± {model['accuracy_stderr']:.4f}") | |
print(f" Evaluation time: {model['evaluation_time']:.2f}s") | |
failed_models = [m for m in results['models_comparison'] if not m['success']] | |
if failed_models: | |
print("\nβ Unevaluated models:") | |
for i, model in enumerate(failed_models): | |
print(f" {i+1}. {model['model_name']} ({model['provider']})") | |
error_msg = model.get('error', 'Unknown reason') | |
print(f" Reason: {error_msg}") | |
# Check detailed results files | |
detailed_file = Path(f"{evaluation_task.output_dir}/detailed_results.json") | |
if detailed_file.exists(): | |
print(f"\nπ Detailed results available in: {detailed_file}") | |
# Check raw files | |
raw_results = list(Path(f"{evaluation_task.output_dir}/results").glob("**/*.json")) | |
if raw_results: | |
print(f"\nπ {len(raw_results)} raw result files available in: {evaluation_task.output_dir}/results") | |
print(f"\nβ Evaluation completed!") | |
except Exception as e: | |
print(f"β Error reading results: {str(e)}") | |
print(f" Details: {traceback.format_exc()}") | |
else: | |
print(f"β No evaluation results found in {results_file}") | |
if __name__ == "__main__": | |
# Configure the argument parser | |
parser = argparse.ArgumentParser(description="Test the evaluation task in standalone mode") | |
parser.add_argument("dataset_name", type=str, help="Name of the dataset to evaluate (without the organization)") | |
parser.add_argument("--model", action="append", dest="models", | |
help="Model to evaluate in the format 'name/model,provider'. Can be used multiple times.") | |
parser.add_argument("--timeout", type=int, default=3600, | |
help="Maximum waiting time in seconds (default: 3600)") | |
args = parser.parse_args() | |
# Configure the environment | |
setup_environment() | |
# Transform models into tuples if specified | |
models_to_evaluate = None | |
if args.models: | |
models_to_evaluate = [] | |
for model_spec in args.models: | |
try: | |
model_name, provider = model_spec.split(",") | |
models_to_evaluate.append((model_name, provider)) | |
except ValueError: | |
print(f"β οΈ Invalid model format: {model_spec}. Use 'name/model,provider'") | |
sys.exit(1) | |
# Run the evaluation | |
run_standalone_evaluation(args.dataset_name, models_to_evaluate, args.timeout) | |