Spaces:
Sleeping
Sleeping
| import requests | |
| import time | |
| from concurrent.futures import ThreadPoolExecutor | |
| import csv | |
| NUM_REQUESTS = 5 | |
| CONCURRENT_THREADS = 10 | |
| URL = "http://localhost:5000/" | |
| def send_request(): | |
| data = { | |
| 'search': "A MRI, magnetic resonance imaging, scan is a very useful diagnosis tool.", | |
| 'pipeline_select': '1' | |
| } | |
| start_time = time.time() | |
| try: | |
| response = requests.post(URL, data=data) | |
| elapsed = time.time() - start_time | |
| if response.status_code != 200: | |
| print(f"Error {response.status_code}: {response.text[:100]}") | |
| return response.status_code, elapsed | |
| except Exception as e: | |
| print("Request failed:", e) | |
| return 500, 0 # Treat exceptions as failures | |
| def run_stress_test(): | |
| results = [] | |
| with ThreadPoolExecutor(max_workers=CONCURRENT_THREADS) as executor: | |
| futures = [executor.submit(send_request) for _ in range(NUM_REQUESTS)] | |
| for future in futures: | |
| results.append(future.result()) | |
| successes = sum(1 for r in results if r[0] == 200) | |
| failures = NUM_REQUESTS - successes | |
| avg_time = sum(r[1] for r in results) / NUM_REQUESTS | |
| max_time = max(r[1] for r in results) | |
| min_time = min(r[1] for r in results) | |
| print(f"\n=== Stress Test Results ===") | |
| print(f"Total Requests: {NUM_REQUESTS}") | |
| print(f"Concurrency Level: {CONCURRENT_THREADS}") | |
| print(f"Successes: {successes}") | |
| print(f"Failures: {failures}") | |
| print(f"Avg Time: {avg_time:.3f}s") | |
| print(f"Min Time: {min_time:.3f}s") | |
| print(f"Max Time: {max_time:.3f}s") | |
| return [NUM_REQUESTS, CONCURRENT_THREADS, avg_time, max_time] | |
| if __name__ == "__main__": | |
| # Open the CSV file for writing the summary results | |
| with open('stress_test_results.csv', 'w', newline='') as csvfile: | |
| writer = csv.writer(csvfile) | |
| writer.writerow(['Total Requests', 'Concurrency Level', 'Avg Time', 'Max Time']) | |
| for users in [1, 5, 10, 20, 50, 100]: | |
| CONCURRENT_THREADS = users | |
| NUM_REQUESTS = users * 5 | |
| result = run_stress_test() | |
| writer.writerow(result) | |