Tai Truong
fix readme
d202ada
from __future__ import annotations
import ast
import asyncio
import inspect
from collections.abc import AsyncIterator, Iterator
from copy import deepcopy
from textwrap import dedent
from typing import TYPE_CHECKING, Any, ClassVar, NamedTuple, get_type_hints
import nanoid
import yaml
from langchain_core.tools import StructuredTool
from pydantic import BaseModel, ValidationError
from langflow.base.tools.constants import (
TOOL_OUTPUT_DISPLAY_NAME,
TOOL_OUTPUT_NAME,
TOOL_TABLE_SCHEMA,
TOOLS_METADATA_INPUT_NAME,
)
from langflow.custom.tree_visitor import RequiredInputsVisitor
from langflow.exceptions.component import StreamingError
from langflow.field_typing import Tool # noqa: TC001 Needed by _add_toolkit_output
from langflow.graph.state.model import create_state_model
from langflow.helpers.custom import format_type
from langflow.memory import astore_message, aupdate_messages, delete_message
from langflow.schema.artifact import get_artifact_type, post_process_raw
from langflow.schema.data import Data
from langflow.schema.message import ErrorMessage, Message
from langflow.schema.properties import Source
from langflow.services.tracing.schema import Log
from langflow.template.field.base import UNDEFINED, Input, Output
from langflow.template.frontend_node.custom_components import ComponentFrontendNode
from langflow.utils.async_helpers import run_until_complete
from langflow.utils.util import find_closest_match
from .custom_component import CustomComponent
if TYPE_CHECKING:
from collections.abc import Callable
from langflow.events.event_manager import EventManager
from langflow.graph.edge.schema import EdgeData
from langflow.graph.vertex.base import Vertex
from langflow.inputs.inputs import InputTypes
from langflow.schema import dotdict
from langflow.schema.log import LoggableType
_ComponentToolkit = None
def _get_component_toolkit():
global _ComponentToolkit # noqa: PLW0603
if _ComponentToolkit is None:
from langflow.base.tools.component_tool import ComponentToolkit
_ComponentToolkit = ComponentToolkit
return _ComponentToolkit
BACKWARDS_COMPATIBLE_ATTRIBUTES = ["user_id", "vertex", "tracing_service"]
CONFIG_ATTRIBUTES = ["_display_name", "_description", "_icon", "_name", "_metadata"]
class PlaceholderGraph(NamedTuple):
"""A placeholder graph structure for components, providing backwards compatibility.
and enabling component execution without a full graph object.
This lightweight structure contains essential information typically found in a complete graph,
allowing components to function in isolation or in simplified contexts.
Attributes:
flow_id (str | None): Unique identifier for the flow, if applicable.
user_id (str | None): Identifier of the user associated with the flow, if any.
session_id (str | None): Identifier for the current session, if applicable.
context (dict): Additional contextual information for the component's execution.
flow_name (str | None): Name of the flow, if available.
"""
flow_id: str | None
user_id: str | None
session_id: str | None
context: dict
flow_name: str | None
class Component(CustomComponent):
inputs: list[InputTypes] = []
outputs: list[Output] = []
code_class_base_inheritance: ClassVar[str] = "Component"
_output_logs: dict[str, list[Log]] = {}
_current_output: str = ""
_metadata: dict = {}
_ctx: dict = {}
_code: str | None = None
_logs: list[Log] = []
def __init__(self, **kwargs) -> None:
# if key starts with _ it is a config
# else it is an input
inputs = {}
config = {}
for key, value in kwargs.items():
if key.startswith("_"):
config[key] = value
elif key in CONFIG_ATTRIBUTES:
config[key[1:]] = value
else:
inputs[key] = value
self._inputs: dict[str, InputTypes] = {}
self._outputs_map: dict[str, Output] = {}
self._results: dict[str, Any] = {}
self._attributes: dict[str, Any] = {}
self._parameters = inputs or {}
self._edges: list[EdgeData] = []
self._components: list[Component] = []
self._current_output = ""
self._event_manager: EventManager | None = None
self._state_model = None
self.set_attributes(self._parameters)
self._output_logs = {}
config = config or {}
if "_id" not in config:
config |= {"_id": f"{self.__class__.__name__}-{nanoid.generate(size=5)}"}
self.__inputs = inputs
self.__config = config
self._reset_all_output_values()
super().__init__(**config)
if hasattr(self, "_trace_type"):
self.trace_type = self._trace_type
if not hasattr(self, "trace_type"):
self.trace_type = "chain"
if self.inputs is not None:
self.map_inputs(self.inputs)
if self.outputs is not None:
self.map_outputs(self.outputs)
# Set output types
self._set_output_types(list(self._outputs_map.values()))
self.set_class_code()
self._set_output_required_inputs()
@property
def ctx(self):
if not hasattr(self, "graph") or self.graph is None:
msg = "Graph not found. Please build the graph first."
raise ValueError(msg)
return self.graph.context
def add_to_ctx(self, key: str, value: Any, *, overwrite: bool = False) -> None:
"""Add a key-value pair to the context.
Args:
key (str): The key to add.
value (Any): The value to associate with the key.
overwrite (bool, optional): Whether to overwrite the existing value. Defaults to False.
Raises:
ValueError: If the graph is not built.
"""
if not hasattr(self, "graph") or self.graph is None:
msg = "Graph not found. Please build the graph first."
raise ValueError(msg)
if key in self.graph.context and not overwrite:
msg = f"Key {key} already exists in context. Set overwrite=True to overwrite."
raise ValueError(msg)
self.graph.context.update({key: value})
def update_ctx(self, value_dict: dict[str, Any]) -> None:
"""Update the context with a dictionary of values.
Args:
value_dict (dict[str, Any]): The dictionary of values to update.
Raises:
ValueError: If the graph is not built.
"""
if not hasattr(self, "graph") or self.graph is None:
msg = "Graph not found. Please build the graph first."
raise ValueError(msg)
if not isinstance(value_dict, dict):
msg = "Value dict must be a dictionary"
raise TypeError(msg)
self.graph.context.update(value_dict)
def _pre_run_setup(self):
pass
def set_event_manager(self, event_manager: EventManager | None = None) -> None:
self._event_manager = event_manager
def _reset_all_output_values(self) -> None:
if isinstance(self._outputs_map, dict):
for output in self._outputs_map.values():
output.value = UNDEFINED
def _build_state_model(self):
if self._state_model:
return self._state_model
name = self.name or self.__class__.__name__
model_name = f"{name}StateModel"
fields = {}
for output in self._outputs_map.values():
fields[output.name] = getattr(self, output.method)
self._state_model = create_state_model(model_name=model_name, **fields)
return self._state_model
def get_state_model_instance_getter(self):
state_model = self._build_state_model()
def _instance_getter(_):
return state_model()
_instance_getter.__annotations__["return"] = state_model
return _instance_getter
def __deepcopy__(self, memo: dict) -> Component:
if id(self) in memo:
return memo[id(self)]
kwargs = deepcopy(self.__config, memo)
kwargs["inputs"] = deepcopy(self.__inputs, memo)
new_component = type(self)(**kwargs)
new_component._code = self._code
new_component._outputs_map = self._outputs_map
new_component._inputs = self._inputs
new_component._edges = self._edges
new_component._components = self._components
new_component._parameters = self._parameters
new_component._attributes = self._attributes
new_component._output_logs = self._output_logs
new_component._logs = self._logs # type: ignore[attr-defined]
memo[id(self)] = new_component
return new_component
def set_class_code(self) -> None:
# Get the source code of the calling class
if self._code:
return
try:
module = inspect.getmodule(self.__class__)
if module is None:
msg = "Could not find module for class"
raise ValueError(msg)
class_code = inspect.getsource(module)
self._code = class_code
except OSError as e:
msg = f"Could not find source code for {self.__class__.__name__}"
raise ValueError(msg) from e
def set(self, **kwargs):
"""Connects the component to other components or sets parameters and attributes.
Args:
**kwargs: Keyword arguments representing the connections, parameters, and attributes.
Returns:
None
Raises:
KeyError: If the specified input name does not exist.
"""
for key, value in kwargs.items():
self._process_connection_or_parameters(key, value)
return self
def list_inputs(self):
"""Returns a list of input names."""
return [_input.name for _input in self.inputs]
def list_outputs(self):
"""Returns a list of output names."""
return [_output.name for _output in self._outputs_map.values()]
async def run(self):
"""Executes the component's logic and returns the result.
Returns:
The result of executing the component's logic.
"""
return await self._run()
def set_vertex(self, vertex: Vertex) -> None:
"""Sets the vertex for the component.
Args:
vertex (Vertex): The vertex to set.
Returns:
None
"""
self._vertex = vertex
def get_input(self, name: str) -> Any:
"""Retrieves the value of the input with the specified name.
Args:
name (str): The name of the input.
Returns:
Any: The value of the input.
Raises:
ValueError: If the input with the specified name is not found.
"""
if name in self._inputs:
return self._inputs[name]
msg = f"Input {name} not found in {self.__class__.__name__}"
raise ValueError(msg)
def get_output(self, name: str) -> Any:
"""Retrieves the output with the specified name.
Args:
name (str): The name of the output to retrieve.
Returns:
Any: The output value.
Raises:
ValueError: If the output with the specified name is not found.
"""
if name in self._outputs_map:
return self._outputs_map[name]
msg = f"Output {name} not found in {self.__class__.__name__}"
raise ValueError(msg)
def set_on_output(self, name: str, **kwargs) -> None:
output = self.get_output(name)
for key, value in kwargs.items():
if not hasattr(output, key):
msg = f"Output {name} does not have a method {key}"
raise ValueError(msg)
setattr(output, key, value)
def set_output_value(self, name: str, value: Any) -> None:
if name in self._outputs_map:
self._outputs_map[name].value = value
else:
msg = f"Output {name} not found in {self.__class__.__name__}"
raise ValueError(msg)
def map_outputs(self, outputs: list[Output]) -> None:
"""Maps the given list of outputs to the component.
Args:
outputs (List[Output]): The list of outputs to be mapped.
Raises:
ValueError: If the output name is None.
Returns:
None
"""
for output in outputs:
if output.name is None:
msg = "Output name cannot be None."
raise ValueError(msg)
# Deepcopy is required to avoid modifying the original component;
# allows each instance of each component to modify its own output
self._outputs_map[output.name] = deepcopy(output)
def map_inputs(self, inputs: list[InputTypes]) -> None:
"""Maps the given inputs to the component.
Args:
inputs (List[InputTypes]): A list of InputTypes objects representing the inputs.
Raises:
ValueError: If the input name is None.
"""
for input_ in inputs:
if input_.name is None:
msg = "Input name cannot be None."
raise ValueError(msg)
self._inputs[input_.name] = deepcopy(input_)
def validate(self, params: dict) -> None:
"""Validates the component parameters.
Args:
params (dict): A dictionary containing the component parameters.
Raises:
ValueError: If the inputs are not valid.
ValueError: If the outputs are not valid.
"""
self._validate_inputs(params)
self._validate_outputs()
def update_inputs(
self,
build_config: dotdict,
field_value: Any,
field_name: str | None = None,
):
return self.update_build_config(build_config, field_value, field_name)
def run_and_validate_update_outputs(self, frontend_node: dict, field_name: str, field_value: Any):
frontend_node = self.update_outputs(frontend_node, field_name, field_value)
if field_name == "tool_mode" or frontend_node.get("tool_mode"):
is_tool_mode = field_value or frontend_node.get("tool_mode")
frontend_node["outputs"] = [self._build_tool_output()] if is_tool_mode else frontend_node["outputs"]
if is_tool_mode:
frontend_node.setdefault("template", {})
frontend_node["template"][TOOLS_METADATA_INPUT_NAME] = self._build_tools_metadata_input().to_dict()
elif "template" in frontend_node:
frontend_node["template"].pop(TOOLS_METADATA_INPUT_NAME, None)
self.tools_metadata = frontend_node.get("template", {}).get(TOOLS_METADATA_INPUT_NAME, {}).get("value")
return self._validate_frontend_node(frontend_node)
def _validate_frontend_node(self, frontend_node: dict):
# Check if all outputs are either Output or a valid Output model
for index, output in enumerate(frontend_node["outputs"]):
if isinstance(output, dict):
try:
output_ = Output(**output)
self._set_output_return_type(output_)
output_dict = output_.model_dump()
except ValidationError as e:
msg = f"Invalid output: {e}"
raise ValueError(msg) from e
elif isinstance(output, Output):
# we need to serialize it
self._set_output_return_type(output)
output_dict = output.model_dump()
else:
msg = f"Invalid output type: {type(output)}"
raise TypeError(msg)
frontend_node["outputs"][index] = output_dict
return frontend_node
def update_outputs(self, frontend_node: dict, field_name: str, field_value: Any) -> dict: # noqa: ARG002
"""Default implementation for updating outputs based on field changes.
Subclasses can override this to modify outputs based on field_name and field_value.
"""
return frontend_node
def _set_output_types(self, outputs: list[Output]) -> None:
for output in outputs:
self._set_output_return_type(output)
def _set_output_return_type(self, output: Output) -> None:
if output.method is None:
msg = f"Output {output.name} does not have a method"
raise ValueError(msg)
return_types = self._get_method_return_type(output.method)
output.add_types(return_types)
output.set_selected()
def _set_output_required_inputs(self) -> None:
for output in self.outputs:
if not output.method:
continue
method = getattr(self, output.method, None)
if not method or not callable(method):
continue
try:
source_code = inspect.getsource(method)
ast_tree = ast.parse(dedent(source_code))
except Exception: # noqa: BLE001
ast_tree = ast.parse(dedent(self._code or ""))
visitor = RequiredInputsVisitor(self._inputs)
visitor.visit(ast_tree)
output.required_inputs = sorted(visitor.required_inputs)
def get_output_by_method(self, method: Callable):
# method is a callable and output.method is a string
# we need to find the output that has the same method
output = next((output for output in self._outputs_map.values() if output.method == method.__name__), None)
if output is None:
method_name = method.__name__ if hasattr(method, "__name__") else str(method)
msg = f"Output with method {method_name} not found"
raise ValueError(msg)
return output
def _inherits_from_component(self, method: Callable):
# check if the method is a method from a class that inherits from Component
# and that it is an output of that class
return hasattr(method, "__self__") and isinstance(method.__self__, Component)
def _method_is_valid_output(self, method: Callable):
# check if the method is a method from a class that inherits from Component
# and that it is an output of that class
return (
hasattr(method, "__self__")
and isinstance(method.__self__, Component)
and method.__self__.get_output_by_method(method)
)
def _build_error_string_from_matching_pairs(self, matching_pairs: list[tuple[Output, Input]]):
text = ""
for output, input_ in matching_pairs:
text += f"{output.name}[{','.join(output.types)}]->{input_.name}[{','.join(input_.input_types or [])}]\n"
return text
def _find_matching_output_method(self, input_name: str, value: Component):
"""Find the output method from the given component and input name.
Find the output method from the given component (`value`) that matches the specified input (`input_name`)
in the current component.
This method searches through all outputs of the provided component to find outputs whose types match
the input types of the specified input in the current component. If exactly one matching output is found,
it returns the corresponding method. If multiple matching outputs are found, it raises an error indicating
ambiguity. If no matching outputs are found, it raises an error indicating that no suitable output was found.
Args:
input_name (str): The name of the input in the current component to match.
value (Component): The component whose outputs are to be considered.
Returns:
Callable: The method corresponding to the matching output.
Raises:
ValueError: If multiple matching outputs are found, if no matching outputs are found,
or if the output method is invalid.
"""
# Retrieve all outputs from the given component
outputs = value._outputs_map.values()
# Prepare to collect matching output-input pairs
matching_pairs = []
# Get the input object from the current component
input_ = self._inputs[input_name]
# Iterate over outputs to find matches based on types
matching_pairs = [
(output, input_)
for output in outputs
for output_type in output.types
# Check if the output type matches the input's accepted types
if input_.input_types and output_type in input_.input_types
]
# If multiple matches are found, raise an error indicating ambiguity
if len(matching_pairs) > 1:
matching_pairs_str = self._build_error_string_from_matching_pairs(matching_pairs)
msg = (
f"There are multiple outputs from {value.__class__.__name__} "
f"that can connect to inputs in {self.__class__.__name__}: {matching_pairs_str}"
)
# If no matches are found, raise an error indicating no suitable output
if not matching_pairs:
msg = (
f"No matching output from {value.__class__.__name__} found for input '{input_name}' "
f"in {self.__class__.__name__}."
)
raise ValueError(msg)
# Get the matching output and input pair
output, input_ = matching_pairs[0]
# Ensure that the output method is a valid method name (string)
if not isinstance(output.method, str):
msg = f"Method {output.method} is not a valid output of {value.__class__.__name__}"
raise TypeError(msg)
return getattr(value, output.method)
def _process_connection_or_parameter(self, key, value) -> None:
input_ = self._get_or_create_input(key)
# We need to check if callable AND if it is a method from a class that inherits from Component
if isinstance(value, Component):
# We need to find the Output that can connect to an input of the current component
# if there's more than one output that matches, we need to raise an error
# because we don't know which one to connect to
value = self._find_matching_output_method(key, value)
if callable(value) and self._inherits_from_component(value):
try:
self._method_is_valid_output(value)
except ValueError as e:
msg = f"Method {value.__name__} is not a valid output of {value.__self__.__class__.__name__}"
raise ValueError(msg) from e
self._connect_to_component(key, value, input_)
else:
self._set_parameter_or_attribute(key, value)
def _process_connection_or_parameters(self, key, value) -> None:
# if value is a list of components, we need to process each component
# Note this update make sure it is not a list str | int | float | bool | type(None)
if isinstance(value, list) and not any(
isinstance(val, str | int | float | bool | type(None) | Message | Data | StructuredTool) for val in value
):
for val in value:
self._process_connection_or_parameter(key, val)
else:
self._process_connection_or_parameter(key, value)
def _get_or_create_input(self, key):
try:
return self._inputs[key]
except KeyError:
input_ = self._get_fallback_input(name=key, display_name=key)
self._inputs[key] = input_
self.inputs.append(input_)
return input_
def _connect_to_component(self, key, value, input_) -> None:
component = value.__self__
self._components.append(component)
output = component.get_output_by_method(value)
self._add_edge(component, key, output, input_)
def _add_edge(self, component, key, output, input_) -> None:
self._edges.append(
{
"source": component._id,
"target": self._id,
"data": {
"sourceHandle": {
"dataType": component.name or component.__class__.__name__,
"id": component._id,
"name": output.name,
"output_types": output.types,
},
"targetHandle": {
"fieldName": key,
"id": self._id,
"inputTypes": input_.input_types,
"type": input_.field_type,
},
},
}
)
def _set_parameter_or_attribute(self, key, value) -> None:
if isinstance(value, Component):
methods = ", ".join([f"'{output.method}'" for output in value.outputs])
msg = (
f"You set {value.display_name} as value for `{key}`. "
f"You should pass one of the following: {methods}"
)
raise TypeError(msg)
self._set_input_value(key, value)
self._parameters[key] = value
self._attributes[key] = value
def __call__(self, **kwargs):
self.set(**kwargs)
return run_until_complete(self.run())
async def _run(self):
# Resolve callable inputs
for key, _input in self._inputs.items():
if asyncio.iscoroutinefunction(_input.value):
self._inputs[key].value = await _input.value()
elif callable(_input.value):
self._inputs[key].value = await asyncio.to_thread(_input.value)
self.set_attributes({})
return await self.build_results()
def __getattr__(self, name: str) -> Any:
if "_attributes" in self.__dict__ and name in self.__dict__["_attributes"]:
return self.__dict__["_attributes"][name]
if "_inputs" in self.__dict__ and name in self.__dict__["_inputs"]:
return self.__dict__["_inputs"][name].value
if "_outputs_map" in self.__dict__ and name in self.__dict__["_outputs_map"]:
return self.__dict__["_outputs_map"][name]
if name in BACKWARDS_COMPATIBLE_ATTRIBUTES:
return self.__dict__[f"_{name}"]
if name.startswith("_") and name[1:] in BACKWARDS_COMPATIBLE_ATTRIBUTES:
return self.__dict__[name]
if name == "graph":
# If it got up to here it means it was going to raise
session_id = self._session_id if hasattr(self, "_session_id") else None
user_id = self._user_id if hasattr(self, "_user_id") else None
flow_name = self._flow_name if hasattr(self, "_flow_name") else None
flow_id = self._flow_id if hasattr(self, "_flow_id") else None
return PlaceholderGraph(
flow_id=flow_id, user_id=str(user_id), session_id=session_id, context={}, flow_name=flow_name
)
msg = f"{name} not found in {self.__class__.__name__}"
raise AttributeError(msg)
def _set_input_value(self, name: str, value: Any) -> None:
if name in self._inputs:
input_value = self._inputs[name].value
if isinstance(input_value, Component):
methods = ", ".join([f"'{output.method}'" for output in input_value.outputs])
msg = (
f"You set {input_value.display_name} as value for `{name}`. "
f"You should pass one of the following: {methods}"
)
raise ValueError(msg)
if callable(input_value) and hasattr(input_value, "__self__"):
msg = f"Input {name} is connected to {input_value.__self__.display_name}.{input_value.__name__}"
raise ValueError(msg)
self._inputs[name].value = value
if hasattr(self._inputs[name], "load_from_db"):
self._inputs[name].load_from_db = False
else:
msg = f"Input {name} not found in {self.__class__.__name__}"
raise ValueError(msg)
def _validate_outputs(self) -> None:
# Raise Error if some rule isn't met
pass
def _map_parameters_on_frontend_node(self, frontend_node: ComponentFrontendNode) -> None:
for name, value in self._parameters.items():
frontend_node.set_field_value_in_template(name, value)
def _map_parameters_on_template(self, template: dict) -> None:
for name, value in self._parameters.items():
try:
template[name]["value"] = value
except KeyError as e:
close_match = find_closest_match(name, list(template.keys()))
if close_match:
msg = f"Parameter '{name}' not found in {self.__class__.__name__}. Did you mean '{close_match}'?"
raise ValueError(msg) from e
msg = f"Parameter {name} not found in {self.__class__.__name__}. "
raise ValueError(msg) from e
def _get_method_return_type(self, method_name: str) -> list[str]:
method = getattr(self, method_name)
return_type = get_type_hints(method)["return"]
extracted_return_types = self._extract_return_type(return_type)
return [format_type(extracted_return_type) for extracted_return_type in extracted_return_types]
def _update_template(self, frontend_node: dict):
return frontend_node
def to_frontend_node(self):
# ! This part here is clunky but we need it like this for
# ! backwards compatibility. We can change how prompt component
# ! works and then update this later
field_config = self.get_template_config(self)
frontend_node = ComponentFrontendNode.from_inputs(**field_config)
for key in self._inputs:
frontend_node.set_field_load_from_db_in_template(key, value=False)
self._map_parameters_on_frontend_node(frontend_node)
frontend_node_dict = frontend_node.to_dict(keep_name=False)
frontend_node_dict = self._update_template(frontend_node_dict)
self._map_parameters_on_template(frontend_node_dict["template"])
frontend_node = ComponentFrontendNode.from_dict(frontend_node_dict)
if not self._code:
self.set_class_code()
code_field = Input(
dynamic=True,
required=True,
placeholder="",
multiline=True,
value=self._code,
password=False,
name="code",
advanced=True,
field_type="code",
is_list=False,
)
frontend_node.template.add_field(code_field)
for output in frontend_node.outputs:
if output.types:
continue
return_types = self._get_method_return_type(output.method)
output.add_types(return_types)
output.set_selected()
frontend_node.validate_component()
frontend_node.set_base_classes_from_outputs()
return {
"data": {
"node": frontend_node.to_dict(keep_name=False),
"type": self.name or self.__class__.__name__,
"id": self._id,
},
"id": self._id,
}
def _validate_inputs(self, params: dict) -> None:
# Params keys are the `name` attribute of the Input objects
for key, value in params.copy().items():
if key not in self._inputs:
continue
input_ = self._inputs[key]
# BaseInputMixin has a `validate_assignment=True`
input_.value = value
params[input_.name] = input_.value
def set_attributes(self, params: dict) -> None:
self._validate_inputs(params)
attributes = {}
for key, value in params.items():
if key in self.__dict__ and value != getattr(self, key):
msg = (
f"{self.__class__.__name__} defines an input parameter named '{key}' "
f"that is a reserved word and cannot be used."
)
raise ValueError(msg)
attributes[key] = value
for key, input_obj in self._inputs.items():
if key not in attributes and key not in self._attributes:
attributes[key] = input_obj.value or None
self._attributes.update(attributes)
def _set_outputs(self, outputs: list[dict]) -> None:
self.outputs = [Output(**output) for output in outputs]
for output in self.outputs:
setattr(self, output.name, output)
self._outputs_map[output.name] = output
def get_trace_as_inputs(self):
predefined_inputs = {
input_.name: input_.value
for input_ in self.inputs
if hasattr(input_, "trace_as_input") and input_.trace_as_input
}
# Runtime inputs
runtime_inputs = {name: input_.value for name, input_ in self._inputs.items() if hasattr(input_, "value")}
return {**predefined_inputs, **runtime_inputs}
def get_trace_as_metadata(self):
return {
input_.name: input_.value
for input_ in self.inputs
if hasattr(input_, "trace_as_metadata") and input_.trace_as_metadata
}
async def _build_with_tracing(self):
inputs = self.get_trace_as_inputs()
metadata = self.get_trace_as_metadata()
async with self._tracing_service.trace_context(self, self.trace_name, inputs, metadata):
results, artifacts = await self._build_results()
self._tracing_service.set_outputs(self.trace_name, results)
return results, artifacts
async def _build_without_tracing(self):
return await self._build_results()
async def build_results(self):
"""Build the results of the component."""
if hasattr(self, "graph"):
session_id = self.graph.session_id
elif hasattr(self, "_session_id"):
session_id = self._session_id
else:
session_id = None
try:
if self._tracing_service:
return await self._build_with_tracing()
return await self._build_without_tracing()
except StreamingError as e:
await self.send_error(
exception=e.cause,
session_id=session_id,
trace_name=getattr(self, "trace_name", None),
source=e.source,
)
raise e.cause # noqa: B904
except Exception as e:
await self.send_error(
exception=e,
session_id=session_id,
source=Source(id=self._id, display_name=self.display_name, source=self.display_name),
trace_name=getattr(self, "trace_name", None),
)
raise
async def _build_results(self) -> tuple[dict, dict]:
results = {}
artifacts = {}
if hasattr(self, "_pre_run_setup"):
self._pre_run_setup()
if hasattr(self, "outputs"):
if any(getattr(_input, "tool_mode", False) for _input in self.inputs):
self._append_tool_to_outputs_map()
for output in self._outputs_map.values():
# Build the output if it's connected to some other vertex
# or if it's not connected to any vertex
if (
not self._vertex
or not self._vertex.outgoing_edges
or output.name in self._vertex.edges_source_names
):
if output.method is None:
msg = f"Output {output.name} does not have a method defined."
raise ValueError(msg)
self._current_output = output.name
method: Callable = getattr(self, output.method)
if output.cache and output.value != UNDEFINED:
results[output.name] = output.value
result = output.value
else:
# If the method is asynchronous, we need to await it
if inspect.iscoroutinefunction(method):
result = await method()
else:
result = await asyncio.to_thread(method)
if (
self._vertex is not None
and isinstance(result, Message)
and result.flow_id is None
and self._vertex.graph.flow_id is not None
):
result.set_flow_id(self._vertex.graph.flow_id)
results[output.name] = result
output.value = result
custom_repr = self.custom_repr()
if custom_repr is None and isinstance(result, dict | Data | str):
custom_repr = result
if not isinstance(custom_repr, str):
custom_repr = str(custom_repr)
raw = result
if self.status is None:
artifact_value = raw
else:
artifact_value = self.status
raw = self.status
if hasattr(raw, "data") and raw is not None:
raw = raw.data
if raw is None:
raw = custom_repr
elif hasattr(raw, "model_dump") and raw is not None:
raw = raw.model_dump()
if raw is None and isinstance(result, dict | Data | str):
raw = result.data if isinstance(result, Data) else result
artifact_type = get_artifact_type(artifact_value, result)
raw, artifact_type = post_process_raw(raw, artifact_type)
artifact = {"repr": custom_repr, "raw": raw, "type": artifact_type}
artifacts[output.name] = artifact
self._output_logs[output.name] = self._logs
self._logs = []
self._current_output = ""
self._artifacts = artifacts
self._results = results
if self._tracing_service:
self._tracing_service.set_outputs(self.trace_name, results)
return results, artifacts
def custom_repr(self):
if self.repr_value == "":
self.repr_value = self.status
if isinstance(self.repr_value, dict):
return yaml.dump(self.repr_value)
if isinstance(self.repr_value, str):
return self.repr_value
if isinstance(self.repr_value, BaseModel) and not isinstance(self.repr_value, Data):
return str(self.repr_value)
return self.repr_value
def build_inputs(self):
"""Builds the inputs for the custom component.
Returns:
List[Input]: The list of inputs.
"""
# This function is similar to build_config, but it will process the inputs
# and return them as a dict with keys being the Input.name and values being the Input.model_dump()
self.inputs = self.template_config.get("inputs", [])
if not self.inputs:
return {}
return {_input.name: _input.model_dump(by_alias=True, exclude_none=True) for _input in self.inputs}
def _get_field_order(self):
try:
inputs = self.template_config["inputs"]
return [field.name for field in inputs]
except KeyError:
return []
def build(self, **kwargs) -> None:
self.set_attributes(kwargs)
def _get_fallback_input(self, **kwargs):
return Input(**kwargs)
def to_toolkit(self) -> list[Tool]:
component_toolkit = _get_component_toolkit()
tools = component_toolkit(component=self).get_tools(callbacks=self.get_langchain_callbacks())
if hasattr(self, TOOLS_METADATA_INPUT_NAME):
tools = component_toolkit(component=self, metadata=self.tools_metadata).update_tools_metadata(tools=tools)
return tools
def get_project_name(self):
if hasattr(self, "_tracing_service") and self._tracing_service:
return self._tracing_service.project_name
return "Langflow"
def log(self, message: LoggableType | list[LoggableType], name: str | None = None) -> None:
"""Logs a message.
Args:
message (LoggableType | list[LoggableType]): The message to log.
name (str, optional): The name of the log. Defaults to None.
"""
if name is None:
name = f"Log {len(self._logs) + 1}"
log = Log(message=message, type=get_artifact_type(message), name=name)
self._logs.append(log)
if self._tracing_service and self._vertex:
self._tracing_service.add_log(trace_name=self.trace_name, log=log)
if self._event_manager is not None and self._current_output:
data = log.model_dump()
data["output"] = self._current_output
data["component_id"] = self._id
self._event_manager.on_log(data=data)
def _append_tool_output(self) -> None:
if next((output for output in self.outputs if output.name == TOOL_OUTPUT_NAME), None) is None:
self.outputs.append(
Output(
name=TOOL_OUTPUT_NAME,
display_name=TOOL_OUTPUT_DISPLAY_NAME,
method="to_toolkit",
types=["Tool"],
)
)
async def send_message(self, message: Message, id_: str | None = None):
if (hasattr(self, "graph") and self.graph.session_id) and (message is not None and not message.session_id):
message.session_id = self.graph.session_id
stored_message = await self._store_message(message)
self._stored_message_id = stored_message.id
try:
complete_message = ""
if (
self._should_stream_message(stored_message, message)
and message is not None
and isinstance(message.text, AsyncIterator | Iterator)
):
complete_message = await self._stream_message(message.text, stored_message)
stored_message.text = complete_message
stored_message = await self._update_stored_message(stored_message)
else:
# Only send message event for non-streaming messages
self._send_message_event(stored_message, id_=id_)
except Exception:
# remove the message from the database
await delete_message(stored_message.id)
raise
self.status = stored_message
return stored_message
async def _store_message(self, message: Message) -> Message:
flow_id = self.graph.flow_id if hasattr(self, "graph") else None
messages = await astore_message(message, flow_id=flow_id)
if len(messages) != 1:
msg = "Only one message can be stored at a time."
raise ValueError(msg)
return messages[0]
def _send_message_event(self, message: Message, id_: str | None = None, category: str | None = None) -> None:
if hasattr(self, "_event_manager") and self._event_manager:
data_dict = message.data.copy() if hasattr(message, "data") else message.model_dump()
if id_ and not data_dict.get("id"):
data_dict["id"] = id_
category = category or data_dict.get("category", None)
match category:
case "error":
self._event_manager.on_error(data=data_dict)
case "remove_message":
self._event_manager.on_remove_message(data={"id": data_dict["id"]})
case _:
self._event_manager.on_message(data=data_dict)
def _should_stream_message(self, stored_message: Message, original_message: Message) -> bool:
return bool(
hasattr(self, "_event_manager")
and self._event_manager
and stored_message.id
and not isinstance(original_message.text, str)
)
async def _update_stored_message(self, stored_message: Message) -> Message:
message_tables = await aupdate_messages(stored_message)
if len(message_tables) != 1:
msg = "Only one message can be updated at a time."
raise ValueError(msg)
message_table = message_tables[0]
return await Message.create(**message_table.model_dump())
async def _stream_message(self, iterator: AsyncIterator | Iterator, message: Message) -> str:
if not isinstance(iterator, AsyncIterator | Iterator):
msg = "The message must be an iterator or an async iterator."
raise TypeError(msg)
if isinstance(iterator, AsyncIterator):
return await self._handle_async_iterator(iterator, message.id, message)
try:
complete_message = ""
first_chunk = True
for chunk in iterator:
complete_message = self._process_chunk(
chunk.content, complete_message, message.id, message, first_chunk=first_chunk
)
first_chunk = False
except Exception as e:
raise StreamingError(cause=e, source=message.properties.source) from e
else:
return complete_message
async def _handle_async_iterator(self, iterator: AsyncIterator, message_id: str, message: Message) -> str:
complete_message = ""
first_chunk = True
async for chunk in iterator:
complete_message = self._process_chunk(
chunk.content, complete_message, message_id, message, first_chunk=first_chunk
)
first_chunk = False
return complete_message
def _process_chunk(
self, chunk: str, complete_message: str, message_id: str, message: Message, *, first_chunk: bool = False
) -> str:
complete_message += chunk
if self._event_manager:
if first_chunk:
# Send the initial message only on the first chunk
msg_copy = message.model_copy()
msg_copy.text = complete_message
self._send_message_event(msg_copy, id_=message_id)
self._event_manager.on_token(
data={
"chunk": chunk,
"id": str(message_id),
}
)
return complete_message
async def send_error(
self,
exception: Exception,
session_id: str,
trace_name: str,
source: Source,
) -> Message:
"""Send an error message to the frontend."""
flow_id = self.graph.flow_id if hasattr(self, "graph") else None
error_message = ErrorMessage(
flow_id=flow_id,
exception=exception,
session_id=session_id,
trace_name=trace_name,
source=source,
)
await self.send_message(error_message)
return error_message
def _append_tool_to_outputs_map(self):
self._outputs_map[TOOL_OUTPUT_NAME] = self._build_tool_output()
# add a new input for the tool schema
# self.inputs.append(self._build_tool_schema())
def _build_tool_output(self) -> Output:
return Output(name=TOOL_OUTPUT_NAME, display_name=TOOL_OUTPUT_DISPLAY_NAME, method="to_toolkit", types=["Tool"])
def _build_tools_metadata_input(self):
tools = self.to_toolkit()
tool_data = (
self.tools_metadata
if hasattr(self, TOOLS_METADATA_INPUT_NAME)
else [{"name": tool.name, "description": tool.description} for tool in tools]
)
try:
from langflow.io import TableInput
except ImportError as e:
msg = "Failed to import TableInput from langflow.io"
raise ImportError(msg) from e
return TableInput(
name=TOOLS_METADATA_INPUT_NAME,
display_name="Tools Metadata",
real_time_refresh=True,
table_schema=TOOL_TABLE_SCHEMA,
value=tool_data,
)