File size: 8,717 Bytes
970eef1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
#!/usr/bin/env python3
"""
Script pour tester si un fournisseur d'API supporte réellement les requêtes parallèles
"""
import os
import sys
import time
import asyncio
import json
from pathlib import Path
from datetime import datetime

# Ensure environment is properly configured
from dotenv import load_dotenv
load_dotenv()

# Définir le modèle et le fournisseur à tester
MODEL_NAME = "Qwen/QwQ-32B"
PROVIDER = "novita"
REQUEST_COUNT = 5  # Nombre de requêtes

# Liste de questions
PROMPTS = [
    "Explain in detail how parallel computing has transformed modern data processing.",
    "Describe the fundamental differences between CPU and GPU architectures.",
    "Analyze the key challenges in distributed systems design.",
    "Discuss the evolution of natural language processing from rule-based systems to modern transformer architectures.",
    "Explain the concept of quantum computing and how it differs from classical computing paradigms."
]

async def send_request(prompt, request_id=None, show_logs=True):
    """Envoie une requête au modèle et mesure le temps d'exécution"""
    if show_logs and request_id is not None:
        print(f"Démarrage requête {request_id} à {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
    
    start_time = time.time()
    
    cmd_args = [
        "curl", "-s",
        "-X", "POST",
        f"https://api-inference.huggingface.co/models/{MODEL_NAME}",
        "-H", f"Authorization: Bearer {os.environ.get('HF_TOKEN')}",
        "-H", "Content-Type: application/json",
        "-d", json.dumps({
            "inputs": prompt, 
            "parameters": {
                "provider": PROVIDER,
                "max_new_tokens": 20
            }
        })
    ]
    
    process = await asyncio.create_subprocess_exec(
        *cmd_args,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )
    
    stdout, stderr = await process.communicate()
    
    end_time = time.time()
    duration = end_time - start_time
    
    response = stdout.decode("utf-8")
    stderr_output = stderr.decode("utf-8")
    
    # Déterminer le succès
    is_success = False
    try:
        response_json = json.loads(response)
        is_success = process.returncode == 0 and isinstance(response_json, list) and "generated_text" in response_json[0]
    except json.JSONDecodeError:
        is_success = process.returncode == 0 and not ("error" in response.lower())
    except Exception:
        is_success = process.returncode == 0
    
    # Extraire message d'erreur si échec
    error_message = None
    if not is_success:
        try:
            if "error" in response.lower():
                try:
                    response_json = json.loads(response)
                    if "error" in response_json:
                        error_message = response_json["error"]
                except:
                    error_message = f"Erreur non-JSON: {response}"
            elif stderr_output:
                error_message = stderr_output
            else:
                error_message = f"Réponse: {response}"
        except:
            error_message = f"Erreur inconnue. Code: {process.returncode}"
    
    if show_logs and request_id is not None:
        print(f"Fin requête {request_id} à {datetime.now().strftime('%H:%M:%S.%f')[:-3]} (durée: {duration:.2f}s)")
        if not is_success:
            print(f"ERREUR requête {request_id}: {error_message[:100]}..." if error_message and len(error_message) > 100 else error_message)
    
    return {
        "request_id": request_id,
        "prompt": prompt,
        "start_time": start_time,
        "end_time": end_time,
        "duration": duration,
        "success": is_success,
        "response": response,
        "error_message": error_message
    }

async def run_parallel_requests(prompts):
    """Exécute les requêtes en parallèle"""
    print(f"\n=== Test parallèle: {len(prompts)} requêtes pour {MODEL_NAME} ({PROVIDER}) ===")
    print(f"Heure de début: {datetime.now().strftime('%H:%M:%S')}")
    
    # Synchroniser le démarrage des requêtes
    start_event = asyncio.Event()
    
    async def synchronized_request(prompt, req_id):
        await start_event.wait()
        return await send_request(prompt, req_id)
    
    # Créer toutes les tâches
    tasks = [asyncio.create_task(synchronized_request(prompts[i], i)) for i in range(len(prompts))]
    
    # Attendre que toutes les tâches soient prêtes
    await asyncio.sleep(1)
    
    # Lancer toutes les requêtes en même temps
    parallel_start_time = time.time()
    print(f"Démarrage synchronisé à {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
    start_event.set()
    
    # Attendre que toutes les tâches se terminent
    results = await asyncio.gather(*tasks)
    parallel_end_time = time.time()
    parallel_duration = parallel_end_time - parallel_start_time
    
    print(f"Test parallèle terminé en {parallel_duration:.2f}s\n")
    return results, parallel_duration

async def run_sequential_requests(prompts):
    """Exécute les mêmes requêtes séquentiellement"""
    print(f"\n=== Test séquentiel: {len(prompts)} requêtes pour {MODEL_NAME} ({PROVIDER}) ===")
    print(f"Heure de début: {datetime.now().strftime('%H:%M:%S')}")
    
    sequential_start_time = time.time()
    results = []
    
    for i, prompt in enumerate(prompts):
        print(f"Requête séquentielle {i}...")
        result = await send_request(prompt, i)
        results.append(result)
    
    sequential_end_time = time.time()
    sequential_duration = sequential_end_time - sequential_start_time
    
    print(f"Test séquentiel terminé en {sequential_duration:.2f}s\n")
    return results, sequential_duration

async def run_tests():
    """Exécute les tests parallèles puis séquentiels et compare les résultats"""
    global_start = time.time()
    prompts = PROMPTS[:REQUEST_COUNT]  # Utiliser le nombre de prompts spécifié
    
    # 1. Test parallèle
    parallel_results, parallel_duration = await run_parallel_requests(prompts)
    
    # 2. Test séquentiel
    sequential_results, sequential_duration = await run_sequential_requests(prompts)
    
    # 3. Analyser les résultats
    global_end = time.time()
    total_duration = global_end - global_start
    
    # Calculer les métriques
    parallel_success = sum(1 for r in parallel_results if r["success"])
    sequential_success = sum(1 for r in sequential_results if r["success"])
    
    # Calculer le facteur de parallélisme réel (temps séquentiel / temps parallèle)
    if parallel_duration > 0:
        parallelism_factor = sequential_duration / parallel_duration
    else:
        parallelism_factor = 0
    
    # Pourcentage d'amélioration
    improvement_percent = (1 - (parallel_duration / sequential_duration)) * 100 if sequential_duration > 0 else 0
    
    # Afficher le résumé
    print("\n====== RÉSUMÉ DES TESTS ======")
    print(f"Modèle: {MODEL_NAME}, Provider: {PROVIDER}, Requêtes: {len(prompts)}")
    print(f"\nDurée test parallèle:  {parallel_duration:.2f}s ({parallel_success}/{len(prompts)} réussies)")
    print(f"Durée test séquentiel: {sequential_duration:.2f}s ({sequential_success}/{len(prompts)} réussies)")
    print(f"Facteur de parallélisme: {parallelism_factor:.2f}x")
    print(f"Amélioration: {improvement_percent:.1f}%")
    
    if parallelism_factor >= len(prompts) * 0.8:
        conclusion = "EXCELLENT parallélisme (proche du théorique maximum)"
    elif parallelism_factor >= 2:
        conclusion = "BON parallélisme (significativement meilleur que séquentiel)"
    elif parallelism_factor >= 1.3:
        conclusion = "MOYEN parallélisme (légèrement meilleur que séquentiel)"
    else:
        conclusion = "FAIBLE ou PAS DE parallélisme (pas d'avantage significatif)"
    
    print(f"\nConclusion: {conclusion}")
    
    # Enregistrer les résultats
    output_file = f"parallel_test_{PROVIDER}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
    with open(output_file, 'w') as f:
        json.dump({
            "model": MODEL_NAME,
            "provider": PROVIDER,
            "request_count": len(prompts),
            "parallel_duration": parallel_duration,
            "sequential_duration": sequential_duration,
            "parallelism_factor": parallelism_factor,
            "improvement_percent": improvement_percent,
            "conclusion": conclusion,
            "parallel_results": parallel_results,
            "sequential_results": sequential_results
        }, f, indent=2)
    
    print(f"\nRésultats détaillés sauvegardés dans {output_file}")

if __name__ == "__main__":
    asyncio.run(run_tests())