Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
#!/usr/bin/env python | |
""" | |
Script to benchmark the performance of different providers for a given model. | |
Usage: python model_provider_benchmark.py [--model "model_name"] [--output results.json] [--questions 5] | |
""" | |
import argparse | |
import json | |
import time | |
import os | |
import requests | |
from typing import List, Dict, Any, Tuple, Optional | |
import logging | |
from datetime import datetime | |
from dotenv import load_dotenv | |
from huggingface_hub import model_info | |
# Logging configuration | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
) | |
logger = logging.getLogger("provider_benchmark") | |
# Default models to test | |
DEFAULT_MODELS = [ | |
"Qwen/Qwen2.5-72B-Instruct", | |
"meta-llama/Llama-3.3-70B-Instruct", | |
"deepseek-ai/DeepSeek-R1-Distill-Llama-70B", | |
"Qwen/QwQ-32B", | |
"mistralai/Mistral-Small-24B-Instruct-2501" | |
] | |
# Questions to benchmark the models | |
DEFAULT_QUESTIONS = [ | |
"What are the key benefits of using distributed systems?", | |
"Explain the concept of quantum computing in simple terms.", | |
"What are the ethical considerations in artificial intelligence?", | |
"Compare and contrast supervised and unsupervised learning.", | |
"How does blockchain technology ensure security and transparency?" | |
] | |
def get_model_providers(model_name: str) -> List[str]: | |
""" | |
Gets all available providers for a given model. | |
Args: | |
model_name: Name of the model on the Hub | |
Returns: | |
List of available providers | |
""" | |
try: | |
info = model_info(model_name, expand="inferenceProviderMapping") | |
if hasattr(info, "inference_provider_mapping"): | |
providers = list(info.inference_provider_mapping.keys()) | |
return providers | |
else: | |
logger.warning(f"No providers available for {model_name}") | |
return [] | |
except Exception as e: | |
logger.error(f"Error while retrieving providers for {model_name}: {e}") | |
return [] | |
def query_model( | |
model: str, | |
provider: str, | |
prompt: str, | |
token: str | |
) -> Tuple[str, float]: | |
""" | |
Sends a request to a model via the Inference Endpoints API. | |
Args: | |
model: Model name | |
provider: Provider name | |
prompt: Question to ask | |
token: HF token for authentication | |
Returns: | |
Tuple containing the response and execution time | |
""" | |
headers = { | |
"Authorization": f"Bearer {token}", | |
"Content-Type": "application/json" | |
} | |
payload = { | |
"inputs": prompt, | |
"parameters": { | |
"max_new_tokens": 100, | |
"temperature": 0.7, | |
"top_p": 0.9, | |
"do_sample": True, | |
"provider": provider # Add provider in the parameters | |
} | |
} | |
# Build the Inference API URL without provider parameter | |
api_url = f"https://api-inference.huggingface.co/models/{model}" | |
start_time = time.time() | |
try: | |
# Add a small delay between requests to avoid rate limiting | |
time.sleep(0.5) | |
response = requests.post(api_url, headers=headers, json=payload) | |
# Check for specific error cases | |
if response.status_code != 200: | |
try: | |
error_data = response.json() | |
error_msg = error_data.get("error", str(error_data)) | |
except: | |
error_msg = response.text | |
logger.error(f"Error for {model} ({provider}): {error_msg}") | |
return f"ERROR: {error_msg}", 0 | |
response.raise_for_status() | |
result = response.json() | |
# API can return different formats, let's try to normalize | |
if isinstance(result, list) and len(result) > 0: | |
if "generated_text" in result[0]: | |
answer = result[0]["generated_text"] | |
else: | |
answer = str(result) | |
elif isinstance(result, dict): | |
if "generated_text" in result: | |
answer = result["generated_text"] | |
else: | |
answer = str(result) | |
else: | |
answer = str(result) | |
except requests.exceptions.RequestException as e: | |
error_msg = str(e) | |
logger.error(f"Error for {model} ({provider}): {error_msg}") | |
return f"ERROR: {error_msg}", 0 | |
except Exception as e: | |
error_msg = str(e) | |
logger.error(f"Error for {model} ({provider}): {error_msg}") | |
return f"ERROR: {error_msg}", 0 | |
end_time = time.time() | |
execution_time = end_time - start_time | |
return answer, execution_time | |
def run_benchmark( | |
model: str, | |
questions: List[str] = DEFAULT_QUESTIONS, | |
output_file: str = None | |
) -> Optional[List[Dict[str, Any]]]: | |
""" | |
Runs a benchmark for all model/provider combinations. | |
Args: | |
model: Name of the model to test | |
questions: List of questions to ask | |
output_file: Path to the output JSON file (optional) | |
Returns: | |
List of ranked providers or None in case of error | |
""" | |
# Load environment variables | |
load_dotenv() | |
# Get HF token (without reading directly from .env file) | |
hf_token = os.environ.get("HF_TOKEN") | |
if not hf_token: | |
logger.error("HF_TOKEN not defined") | |
return None | |
# Get all available providers for this model | |
providers = get_model_providers(model) | |
if not providers: | |
logger.warning(f"No providers for {model}") | |
return None | |
logger.info(f"Testing {model} with providers: {', '.join(providers)}") | |
# Structure to store results | |
results = { | |
"providers": {} | |
} | |
# Test each provider | |
for provider in providers: | |
logger.info(f"Provider: {provider}") | |
provider_results = { | |
"questions": [], | |
"total_time": 0, | |
"average_time": 0, | |
"success_rate": 0 | |
} | |
successful_queries = 0 | |
total_time = 0 | |
# Ask each question | |
for i, question in enumerate(questions): | |
answer, execution_time = query_model( | |
model=model, | |
provider=provider, | |
prompt=question, | |
token=hf_token | |
) | |
# Check if the request was successful | |
is_error = answer.startswith("ERROR:") | |
if not is_error: | |
successful_queries += 1 | |
total_time += execution_time | |
# Save results for this question | |
provider_results["questions"].append({ | |
"question": question, | |
"time": execution_time, | |
"success": not is_error, | |
"answer": answer[:100] + "..." if len(answer) > 100 else answer | |
}) | |
# Calculate global metrics | |
provider_results["total_time"] = total_time | |
provider_results["average_time"] = total_time / successful_queries if successful_queries > 0 else 0 | |
provider_results["success_rate"] = successful_queries / len(questions) | |
# Add results for this provider | |
results["providers"][provider] = provider_results | |
# Check if at least one provider succeeded | |
if not any(data["success_rate"] > 0 for data in results["providers"].values()): | |
logger.warning(f"No successful providers for {model}") | |
return None | |
# Create a ranked list of providers | |
sorted_providers = sorted( | |
results["providers"].items(), | |
key=lambda x: x[1]["total_time"] if x[1]["success_rate"] > 0 else float('inf') | |
) | |
# Return only the ranked list of providers | |
return [ | |
{ | |
"provider": provider, | |
"total_time": data["total_time"], | |
"success_rate": data["success_rate"], | |
"average_time": data["average_time"] | |
} | |
for provider, data in sorted_providers | |
] | |
def display_results(model: str, results: List[Dict[str, Any]]) -> None: | |
""" | |
Displays benchmark results in a readable format. | |
Args: | |
model: Model name | |
results: List of ranked providers | |
""" | |
print(f"\n===== Benchmark Results for {model} =====") | |
print(f"Number of providers tested: {len(results)}") | |
print("\nProvider Rankings (fastest to slowest):") | |
print("-" * 80) | |
print(f"{'Rank':<6} {'Provider':<20} {'Success Rate':<15} {'Total Time (s)':<20} {'Avg Time (s)':<15}") | |
print("-" * 80) | |
for i, provider_data in enumerate(results, 1): | |
print(f"{i:<6} {provider_data['provider']:<20} {provider_data['success_rate']*100:>6.1f}% {provider_data['total_time']:>8.2f}s {provider_data['average_time']:>6.2f}s") | |
def calculate_model_rankings(all_results: Dict[str, Any]) -> List[Dict[str, Any]]: | |
""" | |
Calculates model rankings based on their performance. | |
Args: | |
all_results: Complete benchmark results | |
Returns: | |
List of models ranked by performance | |
""" | |
model_rankings = [] | |
for model_name, results in all_results["models"].items(): | |
if results is None: | |
continue | |
# Find the fastest provider with a good success rate | |
best_provider = None | |
best_time = float('inf') | |
best_success_rate = 0 | |
for provider_data in results: | |
if provider_data["success_rate"] >= 0.8: # Only consider providers with at least 80% success rate | |
if provider_data["total_time"] < best_time: | |
best_time = provider_data["total_time"] | |
best_success_rate = provider_data["success_rate"] | |
best_provider = provider_data["provider"] | |
if best_provider: | |
model_rankings.append({ | |
"model": model_name, | |
"best_provider": best_provider, | |
"total_time": best_time, | |
"success_rate": best_success_rate, | |
"average_time": best_time / 5 # 5 questions by default | |
}) | |
# Sort by total time (fastest first) | |
return sorted(model_rankings, key=lambda x: x["total_time"]) | |
def display_final_rankings(model_rankings: List[Dict[str, Any]]) -> None: | |
""" | |
Displays the final model rankings. | |
Args: | |
model_rankings: List of ranked models | |
""" | |
print("\n" + "="*80) | |
print("FINAL MODEL RANKINGS (fastest to slowest)") | |
print("="*80) | |
print(f"{'Rank':<6} {'Model':<40} {'Provider':<20} {'Total Time (s)':<15} {'Success Rate':<15}") | |
print("-"*80) | |
for i, model_data in enumerate(model_rankings, 1): | |
print(f"{i:<6} {model_data['model']:<40} {model_data['best_provider']:<20} " | |
f"{model_data['total_time']:>8.2f}s {model_data['success_rate']*100:>6.1f}%") | |
def display_final_summary(all_results: Dict[str, Any]) -> None: | |
""" | |
Displays a final summary with ranked providers for each model. | |
Args: | |
all_results: Complete benchmark results | |
""" | |
print("\n" + "="*100) | |
print("FINAL SUMMARY OF PROVIDERS BY MODEL") | |
print("="*100) | |
for model_name, results in all_results["models"].items(): | |
if results is None: | |
print(f"\n{model_name}:") | |
print(" No successful providers found") | |
continue | |
print(f"\n{model_name}:") | |
print(" Successful providers:") | |
for provider_data in results: | |
if provider_data["success_rate"] > 0: | |
print(f" - {provider_data['provider']} (Success rate: {provider_data['success_rate']*100:.1f}%, Avg time: {provider_data['average_time']:.2f}s)") | |
# Check for failed providers | |
failed_providers = [p for p in results if p["success_rate"] == 0] | |
if failed_providers: | |
print(" Failed providers:") | |
for provider_data in failed_providers: | |
print(f" - {provider_data['provider']}") | |
def main(): | |
""" | |
Main entry point for the script. | |
""" | |
parser = argparse.ArgumentParser(description="Tests the performance of model providers.") | |
parser.add_argument("--model", type=str, help="Name of the model to test (if not specified, all default models will be tested)") | |
parser.add_argument("--output", type=str, default="benchmark_results.json", help="Path to the output JSON file") | |
parser.add_argument("--questions", type=int, default=5, help="Number of questions to ask (default: 5)") | |
args = parser.parse_args() | |
# Limit the number of questions to the maximum available | |
num_questions = min(args.questions, len(DEFAULT_QUESTIONS)) | |
questions = DEFAULT_QUESTIONS[:num_questions] | |
# Determine which models to test | |
models_to_test = [args.model] if args.model else DEFAULT_MODELS | |
# Structure to store all results | |
all_results = { | |
"timestamp": datetime.now().isoformat(), | |
"models": {} | |
} | |
# Test each model | |
for model in models_to_test: | |
logger.info(f"\nModel: {model}") | |
results = run_benchmark( | |
model=model, | |
questions=questions, | |
output_file=None # We don't save individually | |
) | |
all_results["models"][model] = results | |
# Save all results | |
with open(args.output, "w") as f: | |
json.dump(all_results, f, indent=2) | |
logger.info(f"\nResults saved to {args.output}") | |
# Display only the final summary | |
display_final_summary(all_results) | |
if __name__ == "__main__": | |
main() |