Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
File size: 8,717 Bytes
970eef1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 |
#!/usr/bin/env python3
"""
Script pour tester si un fournisseur d'API supporte réellement les requêtes parallèles
"""
import os
import sys
import time
import asyncio
import json
from pathlib import Path
from datetime import datetime
# Ensure environment is properly configured
from dotenv import load_dotenv
load_dotenv()
# Définir le modèle et le fournisseur à tester
MODEL_NAME = "Qwen/QwQ-32B"
PROVIDER = "novita"
REQUEST_COUNT = 5 # Nombre de requêtes
# Liste de questions
PROMPTS = [
"Explain in detail how parallel computing has transformed modern data processing.",
"Describe the fundamental differences between CPU and GPU architectures.",
"Analyze the key challenges in distributed systems design.",
"Discuss the evolution of natural language processing from rule-based systems to modern transformer architectures.",
"Explain the concept of quantum computing and how it differs from classical computing paradigms."
]
async def send_request(prompt, request_id=None, show_logs=True):
"""Envoie une requête au modèle et mesure le temps d'exécution"""
if show_logs and request_id is not None:
print(f"Démarrage requête {request_id} à {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
start_time = time.time()
cmd_args = [
"curl", "-s",
"-X", "POST",
f"https://api-inference.huggingface.co/models/{MODEL_NAME}",
"-H", f"Authorization: Bearer {os.environ.get('HF_TOKEN')}",
"-H", "Content-Type: application/json",
"-d", json.dumps({
"inputs": prompt,
"parameters": {
"provider": PROVIDER,
"max_new_tokens": 20
}
})
]
process = await asyncio.create_subprocess_exec(
*cmd_args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
end_time = time.time()
duration = end_time - start_time
response = stdout.decode("utf-8")
stderr_output = stderr.decode("utf-8")
# Déterminer le succès
is_success = False
try:
response_json = json.loads(response)
is_success = process.returncode == 0 and isinstance(response_json, list) and "generated_text" in response_json[0]
except json.JSONDecodeError:
is_success = process.returncode == 0 and not ("error" in response.lower())
except Exception:
is_success = process.returncode == 0
# Extraire message d'erreur si échec
error_message = None
if not is_success:
try:
if "error" in response.lower():
try:
response_json = json.loads(response)
if "error" in response_json:
error_message = response_json["error"]
except:
error_message = f"Erreur non-JSON: {response}"
elif stderr_output:
error_message = stderr_output
else:
error_message = f"Réponse: {response}"
except:
error_message = f"Erreur inconnue. Code: {process.returncode}"
if show_logs and request_id is not None:
print(f"Fin requête {request_id} à {datetime.now().strftime('%H:%M:%S.%f')[:-3]} (durée: {duration:.2f}s)")
if not is_success:
print(f"ERREUR requête {request_id}: {error_message[:100]}..." if error_message and len(error_message) > 100 else error_message)
return {
"request_id": request_id,
"prompt": prompt,
"start_time": start_time,
"end_time": end_time,
"duration": duration,
"success": is_success,
"response": response,
"error_message": error_message
}
async def run_parallel_requests(prompts):
"""Exécute les requêtes en parallèle"""
print(f"\n=== Test parallèle: {len(prompts)} requêtes pour {MODEL_NAME} ({PROVIDER}) ===")
print(f"Heure de début: {datetime.now().strftime('%H:%M:%S')}")
# Synchroniser le démarrage des requêtes
start_event = asyncio.Event()
async def synchronized_request(prompt, req_id):
await start_event.wait()
return await send_request(prompt, req_id)
# Créer toutes les tâches
tasks = [asyncio.create_task(synchronized_request(prompts[i], i)) for i in range(len(prompts))]
# Attendre que toutes les tâches soient prêtes
await asyncio.sleep(1)
# Lancer toutes les requêtes en même temps
parallel_start_time = time.time()
print(f"Démarrage synchronisé à {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
start_event.set()
# Attendre que toutes les tâches se terminent
results = await asyncio.gather(*tasks)
parallel_end_time = time.time()
parallel_duration = parallel_end_time - parallel_start_time
print(f"Test parallèle terminé en {parallel_duration:.2f}s\n")
return results, parallel_duration
async def run_sequential_requests(prompts):
"""Exécute les mêmes requêtes séquentiellement"""
print(f"\n=== Test séquentiel: {len(prompts)} requêtes pour {MODEL_NAME} ({PROVIDER}) ===")
print(f"Heure de début: {datetime.now().strftime('%H:%M:%S')}")
sequential_start_time = time.time()
results = []
for i, prompt in enumerate(prompts):
print(f"Requête séquentielle {i}...")
result = await send_request(prompt, i)
results.append(result)
sequential_end_time = time.time()
sequential_duration = sequential_end_time - sequential_start_time
print(f"Test séquentiel terminé en {sequential_duration:.2f}s\n")
return results, sequential_duration
async def run_tests():
"""Exécute les tests parallèles puis séquentiels et compare les résultats"""
global_start = time.time()
prompts = PROMPTS[:REQUEST_COUNT] # Utiliser le nombre de prompts spécifié
# 1. Test parallèle
parallel_results, parallel_duration = await run_parallel_requests(prompts)
# 2. Test séquentiel
sequential_results, sequential_duration = await run_sequential_requests(prompts)
# 3. Analyser les résultats
global_end = time.time()
total_duration = global_end - global_start
# Calculer les métriques
parallel_success = sum(1 for r in parallel_results if r["success"])
sequential_success = sum(1 for r in sequential_results if r["success"])
# Calculer le facteur de parallélisme réel (temps séquentiel / temps parallèle)
if parallel_duration > 0:
parallelism_factor = sequential_duration / parallel_duration
else:
parallelism_factor = 0
# Pourcentage d'amélioration
improvement_percent = (1 - (parallel_duration / sequential_duration)) * 100 if sequential_duration > 0 else 0
# Afficher le résumé
print("\n====== RÉSUMÉ DES TESTS ======")
print(f"Modèle: {MODEL_NAME}, Provider: {PROVIDER}, Requêtes: {len(prompts)}")
print(f"\nDurée test parallèle: {parallel_duration:.2f}s ({parallel_success}/{len(prompts)} réussies)")
print(f"Durée test séquentiel: {sequential_duration:.2f}s ({sequential_success}/{len(prompts)} réussies)")
print(f"Facteur de parallélisme: {parallelism_factor:.2f}x")
print(f"Amélioration: {improvement_percent:.1f}%")
if parallelism_factor >= len(prompts) * 0.8:
conclusion = "EXCELLENT parallélisme (proche du théorique maximum)"
elif parallelism_factor >= 2:
conclusion = "BON parallélisme (significativement meilleur que séquentiel)"
elif parallelism_factor >= 1.3:
conclusion = "MOYEN parallélisme (légèrement meilleur que séquentiel)"
else:
conclusion = "FAIBLE ou PAS DE parallélisme (pas d'avantage significatif)"
print(f"\nConclusion: {conclusion}")
# Enregistrer les résultats
output_file = f"parallel_test_{PROVIDER}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(output_file, 'w') as f:
json.dump({
"model": MODEL_NAME,
"provider": PROVIDER,
"request_count": len(prompts),
"parallel_duration": parallel_duration,
"sequential_duration": sequential_duration,
"parallelism_factor": parallelism_factor,
"improvement_percent": improvement_percent,
"conclusion": conclusion,
"parallel_results": parallel_results,
"sequential_results": sequential_results
}, f, indent=2)
print(f"\nRésultats détaillés sauvegardés dans {output_file}")
if __name__ == "__main__":
asyncio.run(run_tests()) |