from typing import Any from aiofile import async_open from fastapi import status from httpx import AsyncClient from langflow.api.v1.schemas import UpdateCustomComponentRequest async def test_get_version(client: AsyncClient): response = await client.get("api/v1/version") result = response.json() assert response.status_code == status.HTTP_200_OK assert isinstance(result, dict), "The result must be a dictionary" assert "version" in result, "The dictionary must contain a key called 'version'" assert "main_version" in result, "The dictionary must contain a key called 'main_version'" assert "package" in result, "The dictionary must contain a key called 'package'" async def test_get_config(client: AsyncClient): response = await client.get("api/v1/config") result = response.json() assert response.status_code == status.HTTP_200_OK assert isinstance(result, dict), "The result must be a dictionary" assert "frontend_timeout" in result, "The dictionary must contain a key called 'frontend_timeout'" assert "auto_saving" in result, "The dictionary must contain a key called 'auto_saving'" assert "health_check_max_retries" in result, "The dictionary must contain a 'health_check_max_retries' key" assert "max_file_size_upload" in result, "The dictionary must contain a key called 'max_file_size_upload'" async def test_update_component_outputs(client: AsyncClient, logged_in_headers: dict): async with async_open("src/backend/tests/data/dynamic_output_component.py", encoding="utf-8") as f: code = await f.read() frontend_node: dict[str, Any] = {"outputs": []} request = UpdateCustomComponentRequest( code=code, frontend_node=frontend_node, field="show_output", field_value=True, template={}, ) response = await client.post("api/v1/custom_component/update", json=request.model_dump(), headers=logged_in_headers) result = response.json() assert response.status_code == status.HTTP_200_OK output_names = [output["name"] for output in result["outputs"]] assert "tool_output" in output_names