Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
#!/usr/bin/env python3 | |
""" | |
Script to run lighteval tests in parallel for multiple models | |
""" | |
import os | |
import sys | |
import json | |
import time | |
import tempfile | |
import asyncio | |
from pathlib import Path | |
from typing import Tuple, List, Dict, Any | |
# Ensure environment is properly configured | |
from dotenv import load_dotenv | |
load_dotenv() | |
# Import yourbench task module | |
sys.path.append(os.getcwd()) | |
from tasks.yourbench_lighteval_task import create_yourbench_task | |
# Define models to test | |
INIT_MODELS = [ | |
# 70B | |
("Qwen/Qwen2.5-72B-Instruct", "novita"), | |
("meta-llama/Llama-3.3-70B-Instruct", "novita"), | |
("deepseek-ai/DeepSeek-R1-Distill-Llama-70B", "novita"), | |
# 20 to 30B | |
("Qwen/QwQ-32B", "novita"), | |
# ("mistralai/Mistral-Small-24B-Instruct-2501", "sambanova"), | |
] | |
async def run_lighteval_test_for_model(model_info: Tuple[str, str]) -> Dict[str, Any]: | |
""" | |
Run lighteval test for a specific model | |
""" | |
model_name, provider = model_info | |
# Parameters | |
dataset_name = "yourbench_a" | |
organization = "yourbench" | |
output_dir = f"uploaded_files/test_parallel_{provider}/lighteval_results" | |
# Create output directory | |
os.makedirs(output_dir, exist_ok=True) | |
# Define full dataset path | |
dataset_path = f"{organization}/{dataset_name}" | |
print(f"Dataset to evaluate for {model_name}: {dataset_path}") | |
# Create temporary file | |
temp_file_path = tempfile.mktemp(suffix=".py") | |
print(f"Creating temporary file for {model_name}: {temp_file_path}") | |
with open(temp_file_path, 'w') as temp_file: | |
temp_file.write(f""" | |
import os | |
import sys | |
sys.path.append("{os.getcwd()}") | |
from tasks.yourbench_lighteval_task import create_yourbench_task | |
# Create yourbench task | |
yourbench = create_yourbench_task("{dataset_path}", "lighteval") | |
# Define TASKS_TABLE needed by lighteval | |
TASKS_TABLE = [yourbench] | |
""") | |
# Build lighteval command args | |
cmd_args = [ | |
"lighteval", | |
"endpoint", | |
"inference-providers", | |
f"model={model_name},provider={provider}", | |
"custom|yourbench|0|0", | |
"--custom-tasks", | |
temp_file_path, | |
"--max-samples", "5", | |
"--output-dir", output_dir, | |
"--save-details", | |
"--no-push-to-hub" | |
] | |
print(f"Running command for {model_name}: {' '.join(cmd_args)}") | |
print(f"Start time for {model_name}: {time.strftime('%H:%M:%S')}") | |
results = { | |
"model_name": model_name, | |
"provider": provider, | |
"success": False, | |
"error": None, | |
"results": None, | |
"return_code": None | |
} | |
try: | |
# Prepare environment with needed tokens | |
env = os.environ.copy() | |
hf_token = os.getenv("HF_TOKEN") | |
if hf_token: | |
env["HF_TOKEN"] = hf_token | |
env["HUGGING_FACE_HUB_TOKEN"] = hf_token | |
env["HF_ORGANIZATION"] = organization | |
# Run the process asynchronously | |
process = await asyncio.create_subprocess_exec( | |
*cmd_args, | |
stdout=asyncio.subprocess.PIPE, | |
stderr=asyncio.subprocess.PIPE, | |
env=env | |
) | |
# Wait for the process to complete | |
stdout, stderr = await process.communicate() | |
# Store return code | |
exit_code = process.returncode | |
results["return_code"] = exit_code | |
# Log some output for debugging | |
if stdout: | |
stdout_lines = stdout.decode().strip().split('\n') | |
if stdout_lines and len(stdout_lines) > 0: | |
print(f"Output from {model_name}: {stdout_lines[0]}") | |
# Check if results were generated | |
results_dir = Path(output_dir) / "results" | |
if results_dir.exists(): | |
result_files = list(results_dir.glob("**/*.json")) | |
if result_files: | |
# Read the first results file | |
with open(result_files[0], 'r') as f: | |
test_results = json.load(f) | |
results["results"] = test_results | |
results["success"] = True | |
except asyncio.CancelledError: | |
results["error"] = "Task cancelled" | |
print(f"Task cancelled for {model_name}") | |
except Exception as e: | |
results["error"] = f"Exception: {str(e)}" | |
print(f"Error running test for {model_name}: {str(e)}") | |
finally: | |
# Delete temporary file | |
try: | |
os.unlink(temp_file_path) | |
except: | |
pass | |
print(f"End time for {model_name}: {time.strftime('%H:%M:%S')}") | |
return results | |
async def run_parallel_tests(models: List[Tuple[str, str]]) -> List[Dict[str, Any]]: | |
""" | |
Run tests in parallel for multiple models using asyncio | |
""" | |
print(f"Starting parallel tests for {len(models)} models") | |
# Create tasks for each model | |
tasks = [run_lighteval_test_for_model(model) for model in models] | |
# Run all tasks concurrently and gather results | |
model_results = await asyncio.gather(*tasks, return_exceptions=True) | |
# Process results | |
results = [] | |
for i, result in enumerate(model_results): | |
if isinstance(result, Exception): | |
# Handle exception | |
model_name, provider = models[i] | |
print(f"Test failed for {model_name}: {str(result)}") | |
results.append({ | |
"model_name": model_name, | |
"provider": provider, | |
"success": False, | |
"error": str(result), | |
"results": None, | |
"return_code": None | |
}) | |
else: | |
# Valid result | |
results.append(result) | |
print(f"Test completed for {result['model_name']}") | |
return results | |
def format_comparison_results(results: List[Dict[str, Any]]) -> Dict[str, Any]: | |
""" | |
Format results for easy comparison between models | |
""" | |
comparison = { | |
"metadata": { | |
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), | |
"total_models_tested": len(results), | |
"successful_tests": len([r for r in results if r["success"]]) | |
}, | |
"models_comparison": [] | |
} | |
# Sort models by accuracy (if available) or name | |
sorted_results = sorted( | |
results, | |
key=lambda x: ( | |
x["results"]["results"]["all"]["accuracy"] if x["success"] and x["results"] else -1, | |
x["model_name"] | |
), | |
reverse=True | |
) | |
for result in sorted_results: | |
model_result = { | |
"model_name": result["model_name"], | |
"provider": result["provider"], | |
"success": result["success"] | |
} | |
if result["success"] and result["results"]: | |
model_result.update({ | |
"accuracy": result["results"]["results"]["all"]["accuracy"], | |
"accuracy_stderr": result["results"]["results"]["all"]["accuracy_stderr"], | |
"evaluation_time": float(result["results"]["config_general"]["total_evaluation_time_secondes"]) | |
}) | |
else: | |
model_result["error"] = result["error"] | |
comparison["models_comparison"].append(model_result) | |
return comparison | |
async def main_async(): | |
""" | |
Async main function to run parallel tests | |
""" | |
print("Starting parallel lighteval tests") | |
start_time = time.time() | |
# Run tests in parallel | |
results = await run_parallel_tests(INIT_MODELS) | |
# Save detailed results | |
detailed_output_file = "parallel_test_detailed_results.json" | |
with open(detailed_output_file, 'w') as f: | |
json.dump(results, f, indent=2) | |
# Generate and save comparison results | |
comparison = format_comparison_results(results) | |
comparison_file = "models_comparison.json" | |
with open(comparison_file, 'w') as f: | |
json.dump(comparison, f, indent=2) | |
# Print summary | |
print("\nTest Summary:") | |
for model in comparison["models_comparison"]: | |
status = "✅" if model["success"] else "❌" | |
print(f"{status} {model['model_name']} ({model['provider']})") | |
if not model["success"]: | |
print(f" Error: {model['error']}") | |
else: | |
print(f" Accuracy: {model['accuracy']:.2%} (±{model['accuracy_stderr']:.2%})") | |
print(f" Evaluation time: {model['evaluation_time']:.2f}s") | |
duration = time.time() - start_time | |
print(f"\nTotal execution time: {duration:.2f} seconds") | |
print(f"Detailed results saved to: {detailed_output_file}") | |
print(f"Comparison results saved to: {comparison_file}") | |
def main(): | |
""" | |
Main function to run parallel tests | |
""" | |
# Create event loop and run the async main | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(main_async()) | |
loop.close() | |
if __name__ == "__main__": | |
main() |