Tai Truong
fix readme
d202ada
raw
history blame
4.11 kB
import random
import time
from pathlib import Path
import httpx
import orjson
from locust import FastHttpUser, between, task
from rich import print # noqa: A004
class NameTest(FastHttpUser):
wait_time = between(1, 5)
with Path("names.txt").open(encoding="utf-8") as file:
names = [line.strip() for line in file]
headers: dict = {}
def poll_task(self, task_id, sleep_time=1):
while True:
with self.rest(
"GET",
f"/task/{task_id}",
name="task_status",
headers=self.headers,
) as response:
status = response.js.get("status")
print(f"Poll Response: {response.js}")
if status == "SUCCESS":
return response.js.get("result")
if status in {"FAILURE", "REVOKED"}:
msg = f"Task failed with status: {status}"
raise ValueError(msg)
time.sleep(sleep_time)
def process(self, name, flow_id, payload):
task_id = None
print(f"Processing {payload}")
with self.rest(
"POST",
f"/process/{flow_id}",
json=payload,
name="process",
headers=self.headers,
) as response:
print(response.js)
if response.status_code != 200:
response.failure("Process call failed")
msg = "Process call failed"
raise ValueError(msg)
task_id = response.js.get("id")
session_id = response.js.get("session_id")
assert task_id, "Inner Task ID not found"
assert task_id, "Task ID not found"
result = self.poll_task(task_id)
print(f"Result for {name}: {result}")
return result, session_id
@task
def send_name_and_check(self):
name = random.choice(self.names) # noqa: S311
payload1 = {
"inputs": {"text": f"Hello, My name is {name}"},
"sync": False,
}
_result1, session_id = self.process(name, self.flow_id, payload1)
payload2 = {
"inputs": {"text": "What is my name? Please, answer like this: Your name is <name>"},
"session_id": session_id,
"sync": False,
}
result2, session_id = self.process(name, self.flow_id, payload2)
assert f"Your name is {name}" in str(result2), "Name not found in response"
def on_start(self):
print("Starting")
login_data = {"username": "superuser", "password": "superuser"}
response = httpx.post(f"{self.host}/login", data=login_data)
print(response.json())
tokens = response.json()
print(tokens)
a_token = tokens["access_token"]
logged_in_headers = {"Authorization": f"Bearer {a_token}"}
print("Logged in")
json_flow = (Path(__file__).parent.parent / "data" / "BasicChatwithPromptandHistory.json").read_text(
encoding="utf-8"
)
flow = orjson.loads(json_flow)
data = flow["data"]
# Create test data
flow = {"name": "Flow 1", "description": "description", "data": data}
print("Creating flow")
# Make request to endpoint
response = httpx.post(
f"{self.host}/flows/",
json=flow,
headers=logged_in_headers,
)
self.flow_id = response.json()["id"]
print(f"Flow ID: {self.flow_id}")
# read all users
response = httpx.get(
f"{self.host}/users/",
headers=logged_in_headers,
)
print(response.json())
user_id = next(
(user["id"] for user in response.json()["users"] if user["username"] == "superuser"),
None,
)
# Create api key
response = httpx.post(
f"{self.host}/api_key/",
json={"user_id": user_id},
headers=logged_in_headers,
)
print(response.json())
self.headers["x-api-key"] = response.json()["api_key"]