Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
#!/usr/bin/env python3 | |
""" | |
Script pour tester si un fournisseur d'API supporte réellement les requêtes parallèles | |
""" | |
import os | |
import sys | |
import time | |
import asyncio | |
import json | |
from pathlib import Path | |
from datetime import datetime | |
# Ensure environment is properly configured | |
from dotenv import load_dotenv | |
load_dotenv() | |
# Définir le modèle et le fournisseur à tester | |
MODEL_NAME = "Qwen/QwQ-32B" | |
PROVIDER = "novita" | |
REQUEST_COUNT = 5 # Nombre de requêtes | |
# Liste de questions | |
PROMPTS = [ | |
"Explain in detail how parallel computing has transformed modern data processing.", | |
"Describe the fundamental differences between CPU and GPU architectures.", | |
"Analyze the key challenges in distributed systems design.", | |
"Discuss the evolution of natural language processing from rule-based systems to modern transformer architectures.", | |
"Explain the concept of quantum computing and how it differs from classical computing paradigms." | |
] | |
async def send_request(prompt, request_id=None, show_logs=True): | |
"""Envoie une requête au modèle et mesure le temps d'exécution""" | |
if show_logs and request_id is not None: | |
print(f"Démarrage requête {request_id} à {datetime.now().strftime('%H:%M:%S.%f')[:-3]}") | |
start_time = time.time() | |
cmd_args = [ | |
"curl", "-s", | |
"-X", "POST", | |
f"https://api-inference.huggingface.co/models/{MODEL_NAME}", | |
"-H", f"Authorization: Bearer {os.environ.get('HF_TOKEN')}", | |
"-H", "Content-Type: application/json", | |
"-d", json.dumps({ | |
"inputs": prompt, | |
"parameters": { | |
"provider": PROVIDER, | |
"max_new_tokens": 20 | |
} | |
}) | |
] | |
process = await asyncio.create_subprocess_exec( | |
*cmd_args, | |
stdout=asyncio.subprocess.PIPE, | |
stderr=asyncio.subprocess.PIPE | |
) | |
stdout, stderr = await process.communicate() | |
end_time = time.time() | |
duration = end_time - start_time | |
response = stdout.decode("utf-8") | |
stderr_output = stderr.decode("utf-8") | |
# Déterminer le succès | |
is_success = False | |
try: | |
response_json = json.loads(response) | |
is_success = process.returncode == 0 and isinstance(response_json, list) and "generated_text" in response_json[0] | |
except json.JSONDecodeError: | |
is_success = process.returncode == 0 and not ("error" in response.lower()) | |
except Exception: | |
is_success = process.returncode == 0 | |
# Extraire message d'erreur si échec | |
error_message = None | |
if not is_success: | |
try: | |
if "error" in response.lower(): | |
try: | |
response_json = json.loads(response) | |
if "error" in response_json: | |
error_message = response_json["error"] | |
except: | |
error_message = f"Erreur non-JSON: {response}" | |
elif stderr_output: | |
error_message = stderr_output | |
else: | |
error_message = f"Réponse: {response}" | |
except: | |
error_message = f"Erreur inconnue. Code: {process.returncode}" | |
if show_logs and request_id is not None: | |
print(f"Fin requête {request_id} à {datetime.now().strftime('%H:%M:%S.%f')[:-3]} (durée: {duration:.2f}s)") | |
if not is_success: | |
print(f"ERREUR requête {request_id}: {error_message[:100]}..." if error_message and len(error_message) > 100 else error_message) | |
return { | |
"request_id": request_id, | |
"prompt": prompt, | |
"start_time": start_time, | |
"end_time": end_time, | |
"duration": duration, | |
"success": is_success, | |
"response": response, | |
"error_message": error_message | |
} | |
async def run_parallel_requests(prompts): | |
"""Exécute les requêtes en parallèle""" | |
print(f"\n=== Test parallèle: {len(prompts)} requêtes pour {MODEL_NAME} ({PROVIDER}) ===") | |
print(f"Heure de début: {datetime.now().strftime('%H:%M:%S')}") | |
# Synchroniser le démarrage des requêtes | |
start_event = asyncio.Event() | |
async def synchronized_request(prompt, req_id): | |
await start_event.wait() | |
return await send_request(prompt, req_id) | |
# Créer toutes les tâches | |
tasks = [asyncio.create_task(synchronized_request(prompts[i], i)) for i in range(len(prompts))] | |
# Attendre que toutes les tâches soient prêtes | |
await asyncio.sleep(1) | |
# Lancer toutes les requêtes en même temps | |
parallel_start_time = time.time() | |
print(f"Démarrage synchronisé à {datetime.now().strftime('%H:%M:%S.%f')[:-3]}") | |
start_event.set() | |
# Attendre que toutes les tâches se terminent | |
results = await asyncio.gather(*tasks) | |
parallel_end_time = time.time() | |
parallel_duration = parallel_end_time - parallel_start_time | |
print(f"Test parallèle terminé en {parallel_duration:.2f}s\n") | |
return results, parallel_duration | |
async def run_sequential_requests(prompts): | |
"""Exécute les mêmes requêtes séquentiellement""" | |
print(f"\n=== Test séquentiel: {len(prompts)} requêtes pour {MODEL_NAME} ({PROVIDER}) ===") | |
print(f"Heure de début: {datetime.now().strftime('%H:%M:%S')}") | |
sequential_start_time = time.time() | |
results = [] | |
for i, prompt in enumerate(prompts): | |
print(f"Requête séquentielle {i}...") | |
result = await send_request(prompt, i) | |
results.append(result) | |
sequential_end_time = time.time() | |
sequential_duration = sequential_end_time - sequential_start_time | |
print(f"Test séquentiel terminé en {sequential_duration:.2f}s\n") | |
return results, sequential_duration | |
async def run_tests(): | |
"""Exécute les tests parallèles puis séquentiels et compare les résultats""" | |
global_start = time.time() | |
prompts = PROMPTS[:REQUEST_COUNT] # Utiliser le nombre de prompts spécifié | |
# 1. Test parallèle | |
parallel_results, parallel_duration = await run_parallel_requests(prompts) | |
# 2. Test séquentiel | |
sequential_results, sequential_duration = await run_sequential_requests(prompts) | |
# 3. Analyser les résultats | |
global_end = time.time() | |
total_duration = global_end - global_start | |
# Calculer les métriques | |
parallel_success = sum(1 for r in parallel_results if r["success"]) | |
sequential_success = sum(1 for r in sequential_results if r["success"]) | |
# Calculer le facteur de parallélisme réel (temps séquentiel / temps parallèle) | |
if parallel_duration > 0: | |
parallelism_factor = sequential_duration / parallel_duration | |
else: | |
parallelism_factor = 0 | |
# Pourcentage d'amélioration | |
improvement_percent = (1 - (parallel_duration / sequential_duration)) * 100 if sequential_duration > 0 else 0 | |
# Afficher le résumé | |
print("\n====== RÉSUMÉ DES TESTS ======") | |
print(f"Modèle: {MODEL_NAME}, Provider: {PROVIDER}, Requêtes: {len(prompts)}") | |
print(f"\nDurée test parallèle: {parallel_duration:.2f}s ({parallel_success}/{len(prompts)} réussies)") | |
print(f"Durée test séquentiel: {sequential_duration:.2f}s ({sequential_success}/{len(prompts)} réussies)") | |
print(f"Facteur de parallélisme: {parallelism_factor:.2f}x") | |
print(f"Amélioration: {improvement_percent:.1f}%") | |
if parallelism_factor >= len(prompts) * 0.8: | |
conclusion = "EXCELLENT parallélisme (proche du théorique maximum)" | |
elif parallelism_factor >= 2: | |
conclusion = "BON parallélisme (significativement meilleur que séquentiel)" | |
elif parallelism_factor >= 1.3: | |
conclusion = "MOYEN parallélisme (légèrement meilleur que séquentiel)" | |
else: | |
conclusion = "FAIBLE ou PAS DE parallélisme (pas d'avantage significatif)" | |
print(f"\nConclusion: {conclusion}") | |
# Enregistrer les résultats | |
output_file = f"parallel_test_{PROVIDER}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" | |
with open(output_file, 'w') as f: | |
json.dump({ | |
"model": MODEL_NAME, | |
"provider": PROVIDER, | |
"request_count": len(prompts), | |
"parallel_duration": parallel_duration, | |
"sequential_duration": sequential_duration, | |
"parallelism_factor": parallelism_factor, | |
"improvement_percent": improvement_percent, | |
"conclusion": conclusion, | |
"parallel_results": parallel_results, | |
"sequential_results": sequential_results | |
}, f, indent=2) | |
print(f"\nRésultats détaillés sauvegardés dans {output_file}") | |
if __name__ == "__main__": | |
asyncio.run(run_tests()) |