demo / backend /tests /test_provider_parallel_support.py
tfrere's picture
first commit
970eef1
raw
history blame
8.72 kB
#!/usr/bin/env python3
"""
Script pour tester si un fournisseur d'API supporte réellement les requêtes parallèles
"""
import os
import sys
import time
import asyncio
import json
from pathlib import Path
from datetime import datetime
# Ensure environment is properly configured
from dotenv import load_dotenv
load_dotenv()
# Définir le modèle et le fournisseur à tester
MODEL_NAME = "Qwen/QwQ-32B"
PROVIDER = "novita"
REQUEST_COUNT = 5 # Nombre de requêtes
# Liste de questions
PROMPTS = [
"Explain in detail how parallel computing has transformed modern data processing.",
"Describe the fundamental differences between CPU and GPU architectures.",
"Analyze the key challenges in distributed systems design.",
"Discuss the evolution of natural language processing from rule-based systems to modern transformer architectures.",
"Explain the concept of quantum computing and how it differs from classical computing paradigms."
]
async def send_request(prompt, request_id=None, show_logs=True):
"""Envoie une requête au modèle et mesure le temps d'exécution"""
if show_logs and request_id is not None:
print(f"Démarrage requête {request_id} à {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
start_time = time.time()
cmd_args = [
"curl", "-s",
"-X", "POST",
f"https://api-inference.huggingface.co/models/{MODEL_NAME}",
"-H", f"Authorization: Bearer {os.environ.get('HF_TOKEN')}",
"-H", "Content-Type: application/json",
"-d", json.dumps({
"inputs": prompt,
"parameters": {
"provider": PROVIDER,
"max_new_tokens": 20
}
})
]
process = await asyncio.create_subprocess_exec(
*cmd_args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
end_time = time.time()
duration = end_time - start_time
response = stdout.decode("utf-8")
stderr_output = stderr.decode("utf-8")
# Déterminer le succès
is_success = False
try:
response_json = json.loads(response)
is_success = process.returncode == 0 and isinstance(response_json, list) and "generated_text" in response_json[0]
except json.JSONDecodeError:
is_success = process.returncode == 0 and not ("error" in response.lower())
except Exception:
is_success = process.returncode == 0
# Extraire message d'erreur si échec
error_message = None
if not is_success:
try:
if "error" in response.lower():
try:
response_json = json.loads(response)
if "error" in response_json:
error_message = response_json["error"]
except:
error_message = f"Erreur non-JSON: {response}"
elif stderr_output:
error_message = stderr_output
else:
error_message = f"Réponse: {response}"
except:
error_message = f"Erreur inconnue. Code: {process.returncode}"
if show_logs and request_id is not None:
print(f"Fin requête {request_id} à {datetime.now().strftime('%H:%M:%S.%f')[:-3]} (durée: {duration:.2f}s)")
if not is_success:
print(f"ERREUR requête {request_id}: {error_message[:100]}..." if error_message and len(error_message) > 100 else error_message)
return {
"request_id": request_id,
"prompt": prompt,
"start_time": start_time,
"end_time": end_time,
"duration": duration,
"success": is_success,
"response": response,
"error_message": error_message
}
async def run_parallel_requests(prompts):
"""Exécute les requêtes en parallèle"""
print(f"\n=== Test parallèle: {len(prompts)} requêtes pour {MODEL_NAME} ({PROVIDER}) ===")
print(f"Heure de début: {datetime.now().strftime('%H:%M:%S')}")
# Synchroniser le démarrage des requêtes
start_event = asyncio.Event()
async def synchronized_request(prompt, req_id):
await start_event.wait()
return await send_request(prompt, req_id)
# Créer toutes les tâches
tasks = [asyncio.create_task(synchronized_request(prompts[i], i)) for i in range(len(prompts))]
# Attendre que toutes les tâches soient prêtes
await asyncio.sleep(1)
# Lancer toutes les requêtes en même temps
parallel_start_time = time.time()
print(f"Démarrage synchronisé à {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
start_event.set()
# Attendre que toutes les tâches se terminent
results = await asyncio.gather(*tasks)
parallel_end_time = time.time()
parallel_duration = parallel_end_time - parallel_start_time
print(f"Test parallèle terminé en {parallel_duration:.2f}s\n")
return results, parallel_duration
async def run_sequential_requests(prompts):
"""Exécute les mêmes requêtes séquentiellement"""
print(f"\n=== Test séquentiel: {len(prompts)} requêtes pour {MODEL_NAME} ({PROVIDER}) ===")
print(f"Heure de début: {datetime.now().strftime('%H:%M:%S')}")
sequential_start_time = time.time()
results = []
for i, prompt in enumerate(prompts):
print(f"Requête séquentielle {i}...")
result = await send_request(prompt, i)
results.append(result)
sequential_end_time = time.time()
sequential_duration = sequential_end_time - sequential_start_time
print(f"Test séquentiel terminé en {sequential_duration:.2f}s\n")
return results, sequential_duration
async def run_tests():
"""Exécute les tests parallèles puis séquentiels et compare les résultats"""
global_start = time.time()
prompts = PROMPTS[:REQUEST_COUNT] # Utiliser le nombre de prompts spécifié
# 1. Test parallèle
parallel_results, parallel_duration = await run_parallel_requests(prompts)
# 2. Test séquentiel
sequential_results, sequential_duration = await run_sequential_requests(prompts)
# 3. Analyser les résultats
global_end = time.time()
total_duration = global_end - global_start
# Calculer les métriques
parallel_success = sum(1 for r in parallel_results if r["success"])
sequential_success = sum(1 for r in sequential_results if r["success"])
# Calculer le facteur de parallélisme réel (temps séquentiel / temps parallèle)
if parallel_duration > 0:
parallelism_factor = sequential_duration / parallel_duration
else:
parallelism_factor = 0
# Pourcentage d'amélioration
improvement_percent = (1 - (parallel_duration / sequential_duration)) * 100 if sequential_duration > 0 else 0
# Afficher le résumé
print("\n====== RÉSUMÉ DES TESTS ======")
print(f"Modèle: {MODEL_NAME}, Provider: {PROVIDER}, Requêtes: {len(prompts)}")
print(f"\nDurée test parallèle: {parallel_duration:.2f}s ({parallel_success}/{len(prompts)} réussies)")
print(f"Durée test séquentiel: {sequential_duration:.2f}s ({sequential_success}/{len(prompts)} réussies)")
print(f"Facteur de parallélisme: {parallelism_factor:.2f}x")
print(f"Amélioration: {improvement_percent:.1f}%")
if parallelism_factor >= len(prompts) * 0.8:
conclusion = "EXCELLENT parallélisme (proche du théorique maximum)"
elif parallelism_factor >= 2:
conclusion = "BON parallélisme (significativement meilleur que séquentiel)"
elif parallelism_factor >= 1.3:
conclusion = "MOYEN parallélisme (légèrement meilleur que séquentiel)"
else:
conclusion = "FAIBLE ou PAS DE parallélisme (pas d'avantage significatif)"
print(f"\nConclusion: {conclusion}")
# Enregistrer les résultats
output_file = f"parallel_test_{PROVIDER}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(output_file, 'w') as f:
json.dump({
"model": MODEL_NAME,
"provider": PROVIDER,
"request_count": len(prompts),
"parallel_duration": parallel_duration,
"sequential_duration": sequential_duration,
"parallelism_factor": parallelism_factor,
"improvement_percent": improvement_percent,
"conclusion": conclusion,
"parallel_results": parallel_results,
"sequential_results": sequential_results
}, f, indent=2)
print(f"\nRésultats détaillés sauvegardés dans {output_file}")
if __name__ == "__main__":
asyncio.run(run_tests())