import json from uuid import UUID import pytest from langflow.memory import aget_messages from langflow.services.database.models.flow import FlowCreate, FlowUpdate from orjson import orjson @pytest.mark.benchmark async def test_build_flow(client, json_memory_chatbot_no_llm, logged_in_headers): flow_id = await _create_flow(client, json_memory_chatbot_no_llm, logged_in_headers) async with client.stream("POST", f"api/v1/build/{flow_id}/flow", json={}, headers=logged_in_headers) as r: await consume_and_assert_stream(r) await check_messages(flow_id) @pytest.mark.benchmark async def test_build_flow_from_request_data(client, json_memory_chatbot_no_llm, logged_in_headers): flow_id = await _create_flow(client, json_memory_chatbot_no_llm, logged_in_headers) response = await client.get("api/v1/flows/" + str(flow_id), headers=logged_in_headers) flow_data = response.json() async with client.stream( "POST", f"api/v1/build/{flow_id}/flow", json={"data": flow_data["data"]}, headers=logged_in_headers ) as r: await consume_and_assert_stream(r) await check_messages(flow_id) async def test_build_flow_with_frozen_path(client, json_memory_chatbot_no_llm, logged_in_headers): flow_id = await _create_flow(client, json_memory_chatbot_no_llm, logged_in_headers) response = await client.get("api/v1/flows/" + str(flow_id), headers=logged_in_headers) flow_data = response.json() flow_data["data"]["nodes"][0]["data"]["node"]["frozen"] = True response = await client.patch( f"api/v1/flows/{flow_id}", json=FlowUpdate(name="Flow", description="description", data=flow_data["data"]).model_dump(), headers=logged_in_headers, ) response.raise_for_status() async with client.stream("POST", f"api/v1/build/{flow_id}/flow", json={}, headers=logged_in_headers) as r: await consume_and_assert_stream(r) await check_messages(flow_id) async def check_messages(flow_id): messages = await aget_messages(flow_id=UUID(flow_id), order="ASC") assert len(messages) == 2 assert messages[0].session_id == flow_id assert messages[0].sender == "User" assert messages[0].sender_name == "User" assert messages[0].text == "" assert messages[1].session_id == flow_id assert messages[1].sender == "Machine" assert messages[1].sender_name == "AI" async def consume_and_assert_stream(r): count = 0 async for line in r.aiter_lines(): # httpx split by \n, but ndjson sends two \n for each line if not line: continue parsed = json.loads(line) if count == 0: assert parsed["event"] == "vertices_sorted" ids = parsed["data"]["ids"] ids.sort() assert ids == ["ChatInput-CIGht"] to_run = parsed["data"]["to_run"] to_run.sort() assert to_run == ["ChatInput-CIGht", "ChatOutput-QA7ej", "Memory-amN4Z", "Prompt-iWbCC"] elif count > 0 and count < 5: assert parsed["event"] == "end_vertex" assert parsed["data"]["build_data"] is not None elif count == 5: assert parsed["event"] == "end" else: msg = f"Unexpected line: {line}" raise ValueError(msg) count += 1 async def _create_flow(client, json_memory_chatbot_no_llm, logged_in_headers): vector_store = orjson.loads(json_memory_chatbot_no_llm) data = vector_store["data"] vector_store = FlowCreate(name="Flow", description="description", data=data, endpoint_name="f") response = await client.post("api/v1/flows/", json=vector_store.model_dump(), headers=logged_in_headers) response.raise_for_status() return response.json()["id"]