Spaces:
Running
Running
import random | |
import time | |
from pathlib import Path | |
import httpx | |
import orjson | |
from locust import FastHttpUser, between, task | |
from rich import print # noqa: A004 | |
class NameTest(FastHttpUser): | |
wait_time = between(1, 5) | |
with Path("names.txt").open(encoding="utf-8") as file: | |
names = [line.strip() for line in file] | |
headers: dict = {} | |
def poll_task(self, task_id, sleep_time=1): | |
while True: | |
with self.rest( | |
"GET", | |
f"/task/{task_id}", | |
name="task_status", | |
headers=self.headers, | |
) as response: | |
status = response.js.get("status") | |
print(f"Poll Response: {response.js}") | |
if status == "SUCCESS": | |
return response.js.get("result") | |
if status in {"FAILURE", "REVOKED"}: | |
msg = f"Task failed with status: {status}" | |
raise ValueError(msg) | |
time.sleep(sleep_time) | |
def process(self, name, flow_id, payload): | |
task_id = None | |
print(f"Processing {payload}") | |
with self.rest( | |
"POST", | |
f"/process/{flow_id}", | |
json=payload, | |
name="process", | |
headers=self.headers, | |
) as response: | |
print(response.js) | |
if response.status_code != 200: | |
response.failure("Process call failed") | |
msg = "Process call failed" | |
raise ValueError(msg) | |
task_id = response.js.get("id") | |
session_id = response.js.get("session_id") | |
assert task_id, "Inner Task ID not found" | |
assert task_id, "Task ID not found" | |
result = self.poll_task(task_id) | |
print(f"Result for {name}: {result}") | |
return result, session_id | |
def send_name_and_check(self): | |
name = random.choice(self.names) # noqa: S311 | |
payload1 = { | |
"inputs": {"text": f"Hello, My name is {name}"}, | |
"sync": False, | |
} | |
_result1, session_id = self.process(name, self.flow_id, payload1) | |
payload2 = { | |
"inputs": {"text": "What is my name? Please, answer like this: Your name is <name>"}, | |
"session_id": session_id, | |
"sync": False, | |
} | |
result2, session_id = self.process(name, self.flow_id, payload2) | |
assert f"Your name is {name}" in str(result2), "Name not found in response" | |
def on_start(self): | |
print("Starting") | |
login_data = {"username": "superuser", "password": "superuser"} | |
response = httpx.post(f"{self.host}/login", data=login_data) | |
print(response.json()) | |
tokens = response.json() | |
print(tokens) | |
a_token = tokens["access_token"] | |
logged_in_headers = {"Authorization": f"Bearer {a_token}"} | |
print("Logged in") | |
json_flow = (Path(__file__).parent.parent / "data" / "BasicChatwithPromptandHistory.json").read_text( | |
encoding="utf-8" | |
) | |
flow = orjson.loads(json_flow) | |
data = flow["data"] | |
# Create test data | |
flow = {"name": "Flow 1", "description": "description", "data": data} | |
print("Creating flow") | |
# Make request to endpoint | |
response = httpx.post( | |
f"{self.host}/flows/", | |
json=flow, | |
headers=logged_in_headers, | |
) | |
self.flow_id = response.json()["id"] | |
print(f"Flow ID: {self.flow_id}") | |
# read all users | |
response = httpx.get( | |
f"{self.host}/users/", | |
headers=logged_in_headers, | |
) | |
print(response.json()) | |
user_id = next( | |
(user["id"] for user in response.json()["users"] if user["username"] == "superuser"), | |
None, | |
) | |
# Create api key | |
response = httpx.post( | |
f"{self.host}/api_key/", | |
json={"user_id": user_id}, | |
headers=logged_in_headers, | |
) | |
print(response.json()) | |
self.headers["x-api-key"] = response.json()["api_key"] | |