Tai Truong
fix readme
d202ada
from __future__ import annotations
from typing import TYPE_CHECKING, Any, cast
from uuid import UUID
from fastapi import HTTPException
from pydantic.v1 import BaseModel, Field, create_model
from sqlmodel import select
from langflow.schema.schema import INPUT_FIELD_NAME
from langflow.services.database.models.flow import Flow
from langflow.services.database.models.flow.model import FlowRead
from langflow.services.deps import async_session_scope, get_settings_service, session_scope
if TYPE_CHECKING:
from collections.abc import Awaitable, Callable
from langflow.graph.graph.base import Graph
from langflow.graph.schema import RunOutputs
from langflow.graph.vertex.base import Vertex
from langflow.schema import Data
INPUT_TYPE_MAP = {
"ChatInput": {"type_hint": "Optional[str]", "default": '""'},
"TextInput": {"type_hint": "Optional[str]", "default": '""'},
"JSONInput": {"type_hint": "Optional[dict]", "default": "{}"},
}
def list_flows(*, user_id: str | None = None) -> list[Data]:
if not user_id:
msg = "Session is invalid"
raise ValueError(msg)
try:
with session_scope() as session:
flows = session.exec(
select(Flow).where(Flow.user_id == user_id).where(Flow.is_component == False) # noqa: E712
).all()
return [flow.to_data() for flow in flows]
except Exception as e:
msg = f"Error listing flows: {e}"
raise ValueError(msg) from e
async def load_flow(
user_id: str, flow_id: str | None = None, flow_name: str | None = None, tweaks: dict | None = None
) -> Graph:
from langflow.graph.graph.base import Graph
from langflow.processing.process import process_tweaks
if not flow_id and not flow_name:
msg = "Flow ID or Flow Name is required"
raise ValueError(msg)
if not flow_id and flow_name:
flow_id = await find_flow(flow_name, user_id)
if not flow_id:
msg = f"Flow {flow_name} not found"
raise ValueError(msg)
async with async_session_scope() as session:
graph_data = flow.data if (flow := await session.get(Flow, flow_id)) else None
if not graph_data:
msg = f"Flow {flow_id} not found"
raise ValueError(msg)
if tweaks:
graph_data = process_tweaks(graph_data=graph_data, tweaks=tweaks)
return Graph.from_payload(graph_data, flow_id=flow_id, user_id=user_id)
async def find_flow(flow_name: str, user_id: str) -> str | None:
async with async_session_scope() as session:
stmt = select(Flow).where(Flow.name == flow_name).where(Flow.user_id == user_id)
flow = (await session.exec(stmt)).first()
return flow.id if flow else None
async def run_flow(
inputs: dict | list[dict] | None = None,
tweaks: dict | None = None,
flow_id: str | None = None,
flow_name: str | None = None,
output_type: str | None = "chat",
user_id: str | None = None,
run_id: str | None = None,
session_id: str | None = None,
graph: Graph | None = None,
) -> list[RunOutputs]:
if user_id is None:
msg = "Session is invalid"
raise ValueError(msg)
if graph is None:
graph = await load_flow(user_id, flow_id, flow_name, tweaks)
if run_id:
graph.set_run_id(UUID(run_id))
if session_id:
graph.session_id = session_id
if user_id:
graph.user_id = user_id
if inputs is None:
inputs = []
if isinstance(inputs, dict):
inputs = [inputs]
inputs_list = []
inputs_components = []
types = []
for input_dict in inputs:
inputs_list.append({INPUT_FIELD_NAME: cast("str", input_dict.get("input_value"))})
inputs_components.append(input_dict.get("components", []))
types.append(input_dict.get("type", "chat"))
outputs = [
vertex.id
for vertex in graph.vertices
if output_type == "debug"
or (
vertex.is_output and (output_type == "any" or output_type in vertex.id.lower()) # type: ignore[operator]
)
]
fallback_to_env_vars = get_settings_service().settings.fallback_to_env_var
return await graph.arun(
inputs_list,
outputs=outputs,
inputs_components=inputs_components,
types=types,
fallback_to_env_vars=fallback_to_env_vars,
)
def generate_function_for_flow(
inputs: list[Vertex], flow_id: str, user_id: str | UUID | None
) -> Callable[..., Awaitable[Any]]:
"""Generate a dynamic flow function based on the given inputs and flow ID.
Args:
inputs (List[Vertex]): The list of input vertices for the flow.
flow_id (str): The ID of the flow.
user_id (str | UUID | None): The user ID associated with the flow.
Returns:
Coroutine: The dynamic flow function.
Raises:
None
Example:
inputs = [vertex1, vertex2]
flow_id = "my_flow"
function = generate_function_for_flow(inputs, flow_id)
result = function(input1, input2)
"""
# Prepare function arguments with type hints and default values
args = [
(
f"{input_.display_name.lower().replace(' ', '_')}: {INPUT_TYPE_MAP[input_.base_name]['type_hint']} = "
f"{INPUT_TYPE_MAP[input_.base_name]['default']}"
)
for input_ in inputs
]
# Maintain original argument names for constructing the tweaks dictionary
original_arg_names = [input_.display_name for input_ in inputs]
# Prepare a Pythonic, valid function argument string
func_args = ", ".join(args)
# Map original argument names to their corresponding Pythonic variable names in the function
arg_mappings = ", ".join(
f'"{original_name}": {name}'
for original_name, name in zip(original_arg_names, [arg.split(":")[0] for arg in args], strict=True)
)
func_body = f"""
from typing import Optional
async def flow_function({func_args}):
tweaks = {{ {arg_mappings} }}
from langflow.helpers.flow import run_flow
from langchain_core.tools import ToolException
from langflow.base.flow_processing.utils import build_data_from_result_data, format_flow_output_data
try:
run_outputs = await run_flow(
tweaks={{key: {{'input_value': value}} for key, value in tweaks.items()}},
flow_id="{flow_id}",
user_id="{user_id}"
)
if not run_outputs:
return []
run_output = run_outputs[0]
data = []
if run_output is not None:
for output in run_output.outputs:
if output:
data.extend(build_data_from_result_data(output))
return format_flow_output_data(data)
except Exception as e:
raise ToolException(f'Error running flow: ' + e)
"""
compiled_func = compile(func_body, "<string>", "exec")
local_scope: dict = {}
exec(compiled_func, globals(), local_scope) # noqa: S102
return local_scope["flow_function"]
def build_function_and_schema(
flow_data: Data, graph: Graph, user_id: str | UUID | None
) -> tuple[Callable[..., Awaitable[Any]], type[BaseModel]]:
"""Builds a dynamic function and schema for a given flow.
Args:
flow_data (Data): The flow record containing information about the flow.
graph (Graph): The graph representing the flow.
user_id (str): The user ID associated with the flow.
Returns:
Tuple[Callable, BaseModel]: A tuple containing the dynamic function and the schema.
"""
flow_id = flow_data.id
inputs = get_flow_inputs(graph)
dynamic_flow_function = generate_function_for_flow(inputs, flow_id, user_id=user_id)
schema = build_schema_from_inputs(flow_data.name, inputs)
return dynamic_flow_function, schema
def get_flow_inputs(graph: Graph) -> list[Vertex]:
"""Retrieves the flow inputs from the given graph.
Args:
graph (Graph): The graph object representing the flow.
Returns:
List[Data]: A list of input data, where each record contains the ID, name, and description of the input vertex.
"""
return [vertex for vertex in graph.vertices if vertex.is_input]
def build_schema_from_inputs(name: str, inputs: list[Vertex]) -> type[BaseModel]:
"""Builds a schema from the given inputs.
Args:
name (str): The name of the schema.
inputs (List[tuple[str, str, str]]): A list of tuples representing the inputs.
Each tuple contains three elements: the input name, the input type, and the input description.
Returns:
BaseModel: The schema model.
"""
fields = {}
for input_ in inputs:
field_name = input_.display_name.lower().replace(" ", "_")
description = input_.description
fields[field_name] = (str, Field(default="", description=description))
return create_model(name, **fields)
def get_arg_names(inputs: list[Vertex]) -> list[dict[str, str]]:
"""Returns a list of dictionaries containing the component name and its corresponding argument name.
Args:
inputs (List[Vertex]): A list of Vertex objects representing the inputs.
Returns:
List[dict[str, str]]: A list of dictionaries, where each dictionary contains the component name and its
argument name.
"""
return [
{"component_name": input_.display_name, "arg_name": input_.display_name.lower().replace(" ", "_")}
for input_ in inputs
]
async def get_flow_by_id_or_endpoint_name(flow_id_or_name: str, user_id: UUID | None = None) -> FlowRead | None:
async with async_session_scope() as session:
endpoint_name = None
try:
flow_id = UUID(flow_id_or_name)
flow = await session.get(Flow, flow_id)
except ValueError:
endpoint_name = flow_id_or_name
stmt = select(Flow).where(Flow.endpoint_name == endpoint_name)
if user_id:
stmt = stmt.where(Flow.user_id == user_id)
flow = (await session.exec(stmt)).first()
if flow is None:
raise HTTPException(status_code=404, detail=f"Flow identifier {flow_id_or_name} not found")
return FlowRead.model_validate(flow, from_attributes=True)
async def generate_unique_flow_name(flow_name, user_id, session):
original_name = flow_name
n = 1
while True:
# Check if a flow with the given name exists
existing_flow = (
await session.exec(
select(Flow).where(
Flow.name == flow_name,
Flow.user_id == user_id,
)
)
).first()
# If no flow with the given name exists, return the name
if not existing_flow:
return flow_name
# If a flow with the name already exists, append (n) to the name and increment n
flow_name = f"{original_name} ({n})"
n += 1