#!/usr/bin/env python3 """ Script pour tester si un fournisseur d'API supporte réellement les requêtes parallèles """ import os import sys import time import asyncio import json from pathlib import Path from datetime import datetime # Ensure environment is properly configured from dotenv import load_dotenv load_dotenv() # Définir le modèle et le fournisseur à tester MODEL_NAME = "Qwen/QwQ-32B" PROVIDER = "novita" REQUEST_COUNT = 5 # Nombre de requêtes # Liste de questions PROMPTS = [ "Explain in detail how parallel computing has transformed modern data processing.", "Describe the fundamental differences between CPU and GPU architectures.", "Analyze the key challenges in distributed systems design.", "Discuss the evolution of natural language processing from rule-based systems to modern transformer architectures.", "Explain the concept of quantum computing and how it differs from classical computing paradigms." ] async def send_request(prompt, request_id=None, show_logs=True): """Envoie une requête au modèle et mesure le temps d'exécution""" if show_logs and request_id is not None: print(f"Démarrage requête {request_id} à {datetime.now().strftime('%H:%M:%S.%f')[:-3]}") start_time = time.time() cmd_args = [ "curl", "-s", "-X", "POST", f"https://api-inference.huggingface.co/models/{MODEL_NAME}", "-H", f"Authorization: Bearer {os.environ.get('HF_TOKEN')}", "-H", "Content-Type: application/json", "-d", json.dumps({ "inputs": prompt, "parameters": { "provider": PROVIDER, "max_new_tokens": 20 } }) ] process = await asyncio.create_subprocess_exec( *cmd_args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() end_time = time.time() duration = end_time - start_time response = stdout.decode("utf-8") stderr_output = stderr.decode("utf-8") # Déterminer le succès is_success = False try: response_json = json.loads(response) is_success = process.returncode == 0 and isinstance(response_json, list) and "generated_text" in response_json[0] except json.JSONDecodeError: is_success = process.returncode == 0 and not ("error" in response.lower()) except Exception: is_success = process.returncode == 0 # Extraire message d'erreur si échec error_message = None if not is_success: try: if "error" in response.lower(): try: response_json = json.loads(response) if "error" in response_json: error_message = response_json["error"] except: error_message = f"Erreur non-JSON: {response}" elif stderr_output: error_message = stderr_output else: error_message = f"Réponse: {response}" except: error_message = f"Erreur inconnue. Code: {process.returncode}" if show_logs and request_id is not None: print(f"Fin requête {request_id} à {datetime.now().strftime('%H:%M:%S.%f')[:-3]} (durée: {duration:.2f}s)") if not is_success: print(f"ERREUR requête {request_id}: {error_message[:100]}..." if error_message and len(error_message) > 100 else error_message) return { "request_id": request_id, "prompt": prompt, "start_time": start_time, "end_time": end_time, "duration": duration, "success": is_success, "response": response, "error_message": error_message } async def run_parallel_requests(prompts): """Exécute les requêtes en parallèle""" print(f"\n=== Test parallèle: {len(prompts)} requêtes pour {MODEL_NAME} ({PROVIDER}) ===") print(f"Heure de début: {datetime.now().strftime('%H:%M:%S')}") # Synchroniser le démarrage des requêtes start_event = asyncio.Event() async def synchronized_request(prompt, req_id): await start_event.wait() return await send_request(prompt, req_id) # Créer toutes les tâches tasks = [asyncio.create_task(synchronized_request(prompts[i], i)) for i in range(len(prompts))] # Attendre que toutes les tâches soient prêtes await asyncio.sleep(1) # Lancer toutes les requêtes en même temps parallel_start_time = time.time() print(f"Démarrage synchronisé à {datetime.now().strftime('%H:%M:%S.%f')[:-3]}") start_event.set() # Attendre que toutes les tâches se terminent results = await asyncio.gather(*tasks) parallel_end_time = time.time() parallel_duration = parallel_end_time - parallel_start_time print(f"Test parallèle terminé en {parallel_duration:.2f}s\n") return results, parallel_duration async def run_sequential_requests(prompts): """Exécute les mêmes requêtes séquentiellement""" print(f"\n=== Test séquentiel: {len(prompts)} requêtes pour {MODEL_NAME} ({PROVIDER}) ===") print(f"Heure de début: {datetime.now().strftime('%H:%M:%S')}") sequential_start_time = time.time() results = [] for i, prompt in enumerate(prompts): print(f"Requête séquentielle {i}...") result = await send_request(prompt, i) results.append(result) sequential_end_time = time.time() sequential_duration = sequential_end_time - sequential_start_time print(f"Test séquentiel terminé en {sequential_duration:.2f}s\n") return results, sequential_duration async def run_tests(): """Exécute les tests parallèles puis séquentiels et compare les résultats""" global_start = time.time() prompts = PROMPTS[:REQUEST_COUNT] # Utiliser le nombre de prompts spécifié # 1. Test parallèle parallel_results, parallel_duration = await run_parallel_requests(prompts) # 2. Test séquentiel sequential_results, sequential_duration = await run_sequential_requests(prompts) # 3. Analyser les résultats global_end = time.time() total_duration = global_end - global_start # Calculer les métriques parallel_success = sum(1 for r in parallel_results if r["success"]) sequential_success = sum(1 for r in sequential_results if r["success"]) # Calculer le facteur de parallélisme réel (temps séquentiel / temps parallèle) if parallel_duration > 0: parallelism_factor = sequential_duration / parallel_duration else: parallelism_factor = 0 # Pourcentage d'amélioration improvement_percent = (1 - (parallel_duration / sequential_duration)) * 100 if sequential_duration > 0 else 0 # Afficher le résumé print("\n====== RÉSUMÉ DES TESTS ======") print(f"Modèle: {MODEL_NAME}, Provider: {PROVIDER}, Requêtes: {len(prompts)}") print(f"\nDurée test parallèle: {parallel_duration:.2f}s ({parallel_success}/{len(prompts)} réussies)") print(f"Durée test séquentiel: {sequential_duration:.2f}s ({sequential_success}/{len(prompts)} réussies)") print(f"Facteur de parallélisme: {parallelism_factor:.2f}x") print(f"Amélioration: {improvement_percent:.1f}%") if parallelism_factor >= len(prompts) * 0.8: conclusion = "EXCELLENT parallélisme (proche du théorique maximum)" elif parallelism_factor >= 2: conclusion = "BON parallélisme (significativement meilleur que séquentiel)" elif parallelism_factor >= 1.3: conclusion = "MOYEN parallélisme (légèrement meilleur que séquentiel)" else: conclusion = "FAIBLE ou PAS DE parallélisme (pas d'avantage significatif)" print(f"\nConclusion: {conclusion}") # Enregistrer les résultats output_file = f"parallel_test_{PROVIDER}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" with open(output_file, 'w') as f: json.dump({ "model": MODEL_NAME, "provider": PROVIDER, "request_count": len(prompts), "parallel_duration": parallel_duration, "sequential_duration": sequential_duration, "parallelism_factor": parallelism_factor, "improvement_percent": improvement_percent, "conclusion": conclusion, "parallel_results": parallel_results, "sequential_results": sequential_results }, f, indent=2) print(f"\nRésultats détaillés sauvegardés dans {output_file}") if __name__ == "__main__": asyncio.run(run_tests())