from __future__ import annotations from typing import TYPE_CHECKING, Any, cast from uuid import UUID from fastapi import HTTPException from pydantic.v1 import BaseModel, Field, create_model from sqlmodel import select from langflow.schema.schema import INPUT_FIELD_NAME from langflow.services.database.models.flow import Flow from langflow.services.database.models.flow.model import FlowRead from langflow.services.deps import async_session_scope, get_settings_service, session_scope if TYPE_CHECKING: from collections.abc import Awaitable, Callable from langflow.graph.graph.base import Graph from langflow.graph.schema import RunOutputs from langflow.graph.vertex.base import Vertex from langflow.schema import Data INPUT_TYPE_MAP = { "ChatInput": {"type_hint": "Optional[str]", "default": '""'}, "TextInput": {"type_hint": "Optional[str]", "default": '""'}, "JSONInput": {"type_hint": "Optional[dict]", "default": "{}"}, } def list_flows(*, user_id: str | None = None) -> list[Data]: if not user_id: msg = "Session is invalid" raise ValueError(msg) try: with session_scope() as session: flows = session.exec( select(Flow).where(Flow.user_id == user_id).where(Flow.is_component == False) # noqa: E712 ).all() return [flow.to_data() for flow in flows] except Exception as e: msg = f"Error listing flows: {e}" raise ValueError(msg) from e async def load_flow( user_id: str, flow_id: str | None = None, flow_name: str | None = None, tweaks: dict | None = None ) -> Graph: from langflow.graph.graph.base import Graph from langflow.processing.process import process_tweaks if not flow_id and not flow_name: msg = "Flow ID or Flow Name is required" raise ValueError(msg) if not flow_id and flow_name: flow_id = await find_flow(flow_name, user_id) if not flow_id: msg = f"Flow {flow_name} not found" raise ValueError(msg) async with async_session_scope() as session: graph_data = flow.data if (flow := await session.get(Flow, flow_id)) else None if not graph_data: msg = f"Flow {flow_id} not found" raise ValueError(msg) if tweaks: graph_data = process_tweaks(graph_data=graph_data, tweaks=tweaks) return Graph.from_payload(graph_data, flow_id=flow_id, user_id=user_id) async def find_flow(flow_name: str, user_id: str) -> str | None: async with async_session_scope() as session: stmt = select(Flow).where(Flow.name == flow_name).where(Flow.user_id == user_id) flow = (await session.exec(stmt)).first() return flow.id if flow else None async def run_flow( inputs: dict | list[dict] | None = None, tweaks: dict | None = None, flow_id: str | None = None, flow_name: str | None = None, output_type: str | None = "chat", user_id: str | None = None, run_id: str | None = None, session_id: str | None = None, graph: Graph | None = None, ) -> list[RunOutputs]: if user_id is None: msg = "Session is invalid" raise ValueError(msg) if graph is None: graph = await load_flow(user_id, flow_id, flow_name, tweaks) if run_id: graph.set_run_id(UUID(run_id)) if session_id: graph.session_id = session_id if user_id: graph.user_id = user_id if inputs is None: inputs = [] if isinstance(inputs, dict): inputs = [inputs] inputs_list = [] inputs_components = [] types = [] for input_dict in inputs: inputs_list.append({INPUT_FIELD_NAME: cast("str", input_dict.get("input_value"))}) inputs_components.append(input_dict.get("components", [])) types.append(input_dict.get("type", "chat")) outputs = [ vertex.id for vertex in graph.vertices if output_type == "debug" or ( vertex.is_output and (output_type == "any" or output_type in vertex.id.lower()) # type: ignore[operator] ) ] fallback_to_env_vars = get_settings_service().settings.fallback_to_env_var return await graph.arun( inputs_list, outputs=outputs, inputs_components=inputs_components, types=types, fallback_to_env_vars=fallback_to_env_vars, ) def generate_function_for_flow( inputs: list[Vertex], flow_id: str, user_id: str | UUID | None ) -> Callable[..., Awaitable[Any]]: """Generate a dynamic flow function based on the given inputs and flow ID. Args: inputs (List[Vertex]): The list of input vertices for the flow. flow_id (str): The ID of the flow. user_id (str | UUID | None): The user ID associated with the flow. Returns: Coroutine: The dynamic flow function. Raises: None Example: inputs = [vertex1, vertex2] flow_id = "my_flow" function = generate_function_for_flow(inputs, flow_id) result = function(input1, input2) """ # Prepare function arguments with type hints and default values args = [ ( f"{input_.display_name.lower().replace(' ', '_')}: {INPUT_TYPE_MAP[input_.base_name]['type_hint']} = " f"{INPUT_TYPE_MAP[input_.base_name]['default']}" ) for input_ in inputs ] # Maintain original argument names for constructing the tweaks dictionary original_arg_names = [input_.display_name for input_ in inputs] # Prepare a Pythonic, valid function argument string func_args = ", ".join(args) # Map original argument names to their corresponding Pythonic variable names in the function arg_mappings = ", ".join( f'"{original_name}": {name}' for original_name, name in zip(original_arg_names, [arg.split(":")[0] for arg in args], strict=True) ) func_body = f""" from typing import Optional async def flow_function({func_args}): tweaks = {{ {arg_mappings} }} from langflow.helpers.flow import run_flow from langchain_core.tools import ToolException from langflow.base.flow_processing.utils import build_data_from_result_data, format_flow_output_data try: run_outputs = await run_flow( tweaks={{key: {{'input_value': value}} for key, value in tweaks.items()}}, flow_id="{flow_id}", user_id="{user_id}" ) if not run_outputs: return [] run_output = run_outputs[0] data = [] if run_output is not None: for output in run_output.outputs: if output: data.extend(build_data_from_result_data(output)) return format_flow_output_data(data) except Exception as e: raise ToolException(f'Error running flow: ' + e) """ compiled_func = compile(func_body, "", "exec") local_scope: dict = {} exec(compiled_func, globals(), local_scope) # noqa: S102 return local_scope["flow_function"] def build_function_and_schema( flow_data: Data, graph: Graph, user_id: str | UUID | None ) -> tuple[Callable[..., Awaitable[Any]], type[BaseModel]]: """Builds a dynamic function and schema for a given flow. Args: flow_data (Data): The flow record containing information about the flow. graph (Graph): The graph representing the flow. user_id (str): The user ID associated with the flow. Returns: Tuple[Callable, BaseModel]: A tuple containing the dynamic function and the schema. """ flow_id = flow_data.id inputs = get_flow_inputs(graph) dynamic_flow_function = generate_function_for_flow(inputs, flow_id, user_id=user_id) schema = build_schema_from_inputs(flow_data.name, inputs) return dynamic_flow_function, schema def get_flow_inputs(graph: Graph) -> list[Vertex]: """Retrieves the flow inputs from the given graph. Args: graph (Graph): The graph object representing the flow. Returns: List[Data]: A list of input data, where each record contains the ID, name, and description of the input vertex. """ return [vertex for vertex in graph.vertices if vertex.is_input] def build_schema_from_inputs(name: str, inputs: list[Vertex]) -> type[BaseModel]: """Builds a schema from the given inputs. Args: name (str): The name of the schema. inputs (List[tuple[str, str, str]]): A list of tuples representing the inputs. Each tuple contains three elements: the input name, the input type, and the input description. Returns: BaseModel: The schema model. """ fields = {} for input_ in inputs: field_name = input_.display_name.lower().replace(" ", "_") description = input_.description fields[field_name] = (str, Field(default="", description=description)) return create_model(name, **fields) def get_arg_names(inputs: list[Vertex]) -> list[dict[str, str]]: """Returns a list of dictionaries containing the component name and its corresponding argument name. Args: inputs (List[Vertex]): A list of Vertex objects representing the inputs. Returns: List[dict[str, str]]: A list of dictionaries, where each dictionary contains the component name and its argument name. """ return [ {"component_name": input_.display_name, "arg_name": input_.display_name.lower().replace(" ", "_")} for input_ in inputs ] async def get_flow_by_id_or_endpoint_name(flow_id_or_name: str, user_id: UUID | None = None) -> FlowRead | None: async with async_session_scope() as session: endpoint_name = None try: flow_id = UUID(flow_id_or_name) flow = await session.get(Flow, flow_id) except ValueError: endpoint_name = flow_id_or_name stmt = select(Flow).where(Flow.endpoint_name == endpoint_name) if user_id: stmt = stmt.where(Flow.user_id == user_id) flow = (await session.exec(stmt)).first() if flow is None: raise HTTPException(status_code=404, detail=f"Flow identifier {flow_id_or_name} not found") return FlowRead.model_validate(flow, from_attributes=True) async def generate_unique_flow_name(flow_name, user_id, session): original_name = flow_name n = 1 while True: # Check if a flow with the given name exists existing_flow = ( await session.exec( select(Flow).where( Flow.name == flow_name, Flow.user_id == user_id, ) ) ).first() # If no flow with the given name exists, return the name if not existing_flow: return flow_name # If a flow with the name already exists, append (n) to the name and increment n flow_name = f"{original_name} ({n})" n += 1