#!/usr/bin/env python3 """ Script to run lighteval tests in parallel for multiple models """ import os import sys import json import time import tempfile import asyncio from pathlib import Path from typing import Tuple, List, Dict, Any # Ensure environment is properly configured from dotenv import load_dotenv load_dotenv() # Import yourbench task module sys.path.append(os.getcwd()) from tasks.yourbench_lighteval_task import create_yourbench_task # Define models to test INIT_MODELS = [ # 70B ("Qwen/Qwen2.5-72B-Instruct", "novita"), ("meta-llama/Llama-3.3-70B-Instruct", "novita"), ("deepseek-ai/DeepSeek-R1-Distill-Llama-70B", "novita"), # 20 to 30B ("Qwen/QwQ-32B", "novita"), # ("mistralai/Mistral-Small-24B-Instruct-2501", "sambanova"), ] async def run_lighteval_test_for_model(model_info: Tuple[str, str]) -> Dict[str, Any]: """ Run lighteval test for a specific model """ model_name, provider = model_info # Parameters dataset_name = "yourbench_a" organization = "yourbench" output_dir = f"uploaded_files/test_parallel_{provider}/lighteval_results" # Create output directory os.makedirs(output_dir, exist_ok=True) # Define full dataset path dataset_path = f"{organization}/{dataset_name}" print(f"Dataset to evaluate for {model_name}: {dataset_path}") # Create temporary file temp_file_path = tempfile.mktemp(suffix=".py") print(f"Creating temporary file for {model_name}: {temp_file_path}") with open(temp_file_path, 'w') as temp_file: temp_file.write(f""" import os import sys sys.path.append("{os.getcwd()}") from tasks.yourbench_lighteval_task import create_yourbench_task # Create yourbench task yourbench = create_yourbench_task("{dataset_path}", "lighteval") # Define TASKS_TABLE needed by lighteval TASKS_TABLE = [yourbench] """) # Build lighteval command args cmd_args = [ "lighteval", "endpoint", "inference-providers", f"model={model_name},provider={provider}", "custom|yourbench|0|0", "--custom-tasks", temp_file_path, "--max-samples", "5", "--output-dir", output_dir, "--save-details", "--no-push-to-hub" ] print(f"Running command for {model_name}: {' '.join(cmd_args)}") print(f"Start time for {model_name}: {time.strftime('%H:%M:%S')}") results = { "model_name": model_name, "provider": provider, "success": False, "error": None, "results": None, "return_code": None } try: # Prepare environment with needed tokens env = os.environ.copy() hf_token = os.getenv("HF_TOKEN") if hf_token: env["HF_TOKEN"] = hf_token env["HUGGING_FACE_HUB_TOKEN"] = hf_token env["HF_ORGANIZATION"] = organization # Run the process asynchronously process = await asyncio.create_subprocess_exec( *cmd_args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=env ) # Wait for the process to complete stdout, stderr = await process.communicate() # Store return code exit_code = process.returncode results["return_code"] = exit_code # Log some output for debugging if stdout: stdout_lines = stdout.decode().strip().split('\n') if stdout_lines and len(stdout_lines) > 0: print(f"Output from {model_name}: {stdout_lines[0]}") # Check if results were generated results_dir = Path(output_dir) / "results" if results_dir.exists(): result_files = list(results_dir.glob("**/*.json")) if result_files: # Read the first results file with open(result_files[0], 'r') as f: test_results = json.load(f) results["results"] = test_results results["success"] = True except asyncio.CancelledError: results["error"] = "Task cancelled" print(f"Task cancelled for {model_name}") except Exception as e: results["error"] = f"Exception: {str(e)}" print(f"Error running test for {model_name}: {str(e)}") finally: # Delete temporary file try: os.unlink(temp_file_path) except: pass print(f"End time for {model_name}: {time.strftime('%H:%M:%S')}") return results async def run_parallel_tests(models: List[Tuple[str, str]]) -> List[Dict[str, Any]]: """ Run tests in parallel for multiple models using asyncio """ print(f"Starting parallel tests for {len(models)} models") # Create tasks for each model tasks = [run_lighteval_test_for_model(model) for model in models] # Run all tasks concurrently and gather results model_results = await asyncio.gather(*tasks, return_exceptions=True) # Process results results = [] for i, result in enumerate(model_results): if isinstance(result, Exception): # Handle exception model_name, provider = models[i] print(f"Test failed for {model_name}: {str(result)}") results.append({ "model_name": model_name, "provider": provider, "success": False, "error": str(result), "results": None, "return_code": None }) else: # Valid result results.append(result) print(f"Test completed for {result['model_name']}") return results def format_comparison_results(results: List[Dict[str, Any]]) -> Dict[str, Any]: """ Format results for easy comparison between models """ comparison = { "metadata": { "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "total_models_tested": len(results), "successful_tests": len([r for r in results if r["success"]]) }, "models_comparison": [] } # Sort models by accuracy (if available) or name sorted_results = sorted( results, key=lambda x: ( x["results"]["results"]["all"]["accuracy"] if x["success"] and x["results"] else -1, x["model_name"] ), reverse=True ) for result in sorted_results: model_result = { "model_name": result["model_name"], "provider": result["provider"], "success": result["success"] } if result["success"] and result["results"]: model_result.update({ "accuracy": result["results"]["results"]["all"]["accuracy"], "accuracy_stderr": result["results"]["results"]["all"]["accuracy_stderr"], "evaluation_time": float(result["results"]["config_general"]["total_evaluation_time_secondes"]) }) else: model_result["error"] = result["error"] comparison["models_comparison"].append(model_result) return comparison async def main_async(): """ Async main function to run parallel tests """ print("Starting parallel lighteval tests") start_time = time.time() # Run tests in parallel results = await run_parallel_tests(INIT_MODELS) # Save detailed results detailed_output_file = "parallel_test_detailed_results.json" with open(detailed_output_file, 'w') as f: json.dump(results, f, indent=2) # Generate and save comparison results comparison = format_comparison_results(results) comparison_file = "models_comparison.json" with open(comparison_file, 'w') as f: json.dump(comparison, f, indent=2) # Print summary print("\nTest Summary:") for model in comparison["models_comparison"]: status = "✅" if model["success"] else "❌" print(f"{status} {model['model_name']} ({model['provider']})") if not model["success"]: print(f" Error: {model['error']}") else: print(f" Accuracy: {model['accuracy']:.2%} (±{model['accuracy_stderr']:.2%})") print(f" Evaluation time: {model['evaluation_time']:.2f}s") duration = time.time() - start_time print(f"\nTotal execution time: {duration:.2f} seconds") print(f"Detailed results saved to: {detailed_output_file}") print(f"Comparison results saved to: {comparison_file}") def main(): """ Main function to run parallel tests """ # Create event loop and run the async main loop = asyncio.get_event_loop() loop.run_until_complete(main_async()) loop.close() if __name__ == "__main__": main()