Spaces:
Running
Running
import json | |
from uuid import UUID | |
import pytest | |
from langflow.memory import aget_messages | |
from langflow.services.database.models.flow import FlowCreate, FlowUpdate | |
from orjson import orjson | |
async def test_build_flow(client, json_memory_chatbot_no_llm, logged_in_headers): | |
flow_id = await _create_flow(client, json_memory_chatbot_no_llm, logged_in_headers) | |
async with client.stream("POST", f"api/v1/build/{flow_id}/flow", json={}, headers=logged_in_headers) as r: | |
await consume_and_assert_stream(r) | |
await check_messages(flow_id) | |
async def test_build_flow_from_request_data(client, json_memory_chatbot_no_llm, logged_in_headers): | |
flow_id = await _create_flow(client, json_memory_chatbot_no_llm, logged_in_headers) | |
response = await client.get("api/v1/flows/" + str(flow_id), headers=logged_in_headers) | |
flow_data = response.json() | |
async with client.stream( | |
"POST", f"api/v1/build/{flow_id}/flow", json={"data": flow_data["data"]}, headers=logged_in_headers | |
) as r: | |
await consume_and_assert_stream(r) | |
await check_messages(flow_id) | |
async def test_build_flow_with_frozen_path(client, json_memory_chatbot_no_llm, logged_in_headers): | |
flow_id = await _create_flow(client, json_memory_chatbot_no_llm, logged_in_headers) | |
response = await client.get("api/v1/flows/" + str(flow_id), headers=logged_in_headers) | |
flow_data = response.json() | |
flow_data["data"]["nodes"][0]["data"]["node"]["frozen"] = True | |
response = await client.patch( | |
f"api/v1/flows/{flow_id}", | |
json=FlowUpdate(name="Flow", description="description", data=flow_data["data"]).model_dump(), | |
headers=logged_in_headers, | |
) | |
response.raise_for_status() | |
async with client.stream("POST", f"api/v1/build/{flow_id}/flow", json={}, headers=logged_in_headers) as r: | |
await consume_and_assert_stream(r) | |
await check_messages(flow_id) | |
async def check_messages(flow_id): | |
messages = await aget_messages(flow_id=UUID(flow_id), order="ASC") | |
assert len(messages) == 2 | |
assert messages[0].session_id == flow_id | |
assert messages[0].sender == "User" | |
assert messages[0].sender_name == "User" | |
assert messages[0].text == "" | |
assert messages[1].session_id == flow_id | |
assert messages[1].sender == "Machine" | |
assert messages[1].sender_name == "AI" | |
async def consume_and_assert_stream(r): | |
count = 0 | |
async for line in r.aiter_lines(): | |
# httpx split by \n, but ndjson sends two \n for each line | |
if not line: | |
continue | |
parsed = json.loads(line) | |
if count == 0: | |
assert parsed["event"] == "vertices_sorted" | |
ids = parsed["data"]["ids"] | |
ids.sort() | |
assert ids == ["ChatInput-CIGht"] | |
to_run = parsed["data"]["to_run"] | |
to_run.sort() | |
assert to_run == ["ChatInput-CIGht", "ChatOutput-QA7ej", "Memory-amN4Z", "Prompt-iWbCC"] | |
elif count > 0 and count < 5: | |
assert parsed["event"] == "end_vertex" | |
assert parsed["data"]["build_data"] is not None | |
elif count == 5: | |
assert parsed["event"] == "end" | |
else: | |
msg = f"Unexpected line: {line}" | |
raise ValueError(msg) | |
count += 1 | |
async def _create_flow(client, json_memory_chatbot_no_llm, logged_in_headers): | |
vector_store = orjson.loads(json_memory_chatbot_no_llm) | |
data = vector_store["data"] | |
vector_store = FlowCreate(name="Flow", description="description", data=data, endpoint_name="f") | |
response = await client.post("api/v1/flows/", json=vector_store.model_dump(), headers=logged_in_headers) | |
response.raise_for_status() | |
return response.json()["id"] | |