from __future__ import annotations import ast import asyncio import inspect from collections.abc import AsyncIterator, Iterator from copy import deepcopy from textwrap import dedent from typing import TYPE_CHECKING, Any, ClassVar, NamedTuple, get_type_hints import nanoid import yaml from langchain_core.tools import StructuredTool from pydantic import BaseModel, ValidationError from langflow.base.tools.constants import ( TOOL_OUTPUT_DISPLAY_NAME, TOOL_OUTPUT_NAME, TOOL_TABLE_SCHEMA, TOOLS_METADATA_INPUT_NAME, ) from langflow.custom.tree_visitor import RequiredInputsVisitor from langflow.exceptions.component import StreamingError from langflow.field_typing import Tool # noqa: TC001 Needed by _add_toolkit_output from langflow.graph.state.model import create_state_model from langflow.helpers.custom import format_type from langflow.memory import astore_message, aupdate_messages, delete_message from langflow.schema.artifact import get_artifact_type, post_process_raw from langflow.schema.data import Data from langflow.schema.message import ErrorMessage, Message from langflow.schema.properties import Source from langflow.services.tracing.schema import Log from langflow.template.field.base import UNDEFINED, Input, Output from langflow.template.frontend_node.custom_components import ComponentFrontendNode from langflow.utils.async_helpers import run_until_complete from langflow.utils.util import find_closest_match from .custom_component import CustomComponent if TYPE_CHECKING: from collections.abc import Callable from langflow.events.event_manager import EventManager from langflow.graph.edge.schema import EdgeData from langflow.graph.vertex.base import Vertex from langflow.inputs.inputs import InputTypes from langflow.schema import dotdict from langflow.schema.log import LoggableType _ComponentToolkit = None def _get_component_toolkit(): global _ComponentToolkit # noqa: PLW0603 if _ComponentToolkit is None: from langflow.base.tools.component_tool import ComponentToolkit _ComponentToolkit = ComponentToolkit return _ComponentToolkit BACKWARDS_COMPATIBLE_ATTRIBUTES = ["user_id", "vertex", "tracing_service"] CONFIG_ATTRIBUTES = ["_display_name", "_description", "_icon", "_name", "_metadata"] class PlaceholderGraph(NamedTuple): """A placeholder graph structure for components, providing backwards compatibility. and enabling component execution without a full graph object. This lightweight structure contains essential information typically found in a complete graph, allowing components to function in isolation or in simplified contexts. Attributes: flow_id (str | None): Unique identifier for the flow, if applicable. user_id (str | None): Identifier of the user associated with the flow, if any. session_id (str | None): Identifier for the current session, if applicable. context (dict): Additional contextual information for the component's execution. flow_name (str | None): Name of the flow, if available. """ flow_id: str | None user_id: str | None session_id: str | None context: dict flow_name: str | None class Component(CustomComponent): inputs: list[InputTypes] = [] outputs: list[Output] = [] code_class_base_inheritance: ClassVar[str] = "Component" _output_logs: dict[str, list[Log]] = {} _current_output: str = "" _metadata: dict = {} _ctx: dict = {} _code: str | None = None _logs: list[Log] = [] def __init__(self, **kwargs) -> None: # if key starts with _ it is a config # else it is an input inputs = {} config = {} for key, value in kwargs.items(): if key.startswith("_"): config[key] = value elif key in CONFIG_ATTRIBUTES: config[key[1:]] = value else: inputs[key] = value self._inputs: dict[str, InputTypes] = {} self._outputs_map: dict[str, Output] = {} self._results: dict[str, Any] = {} self._attributes: dict[str, Any] = {} self._parameters = inputs or {} self._edges: list[EdgeData] = [] self._components: list[Component] = [] self._current_output = "" self._event_manager: EventManager | None = None self._state_model = None self.set_attributes(self._parameters) self._output_logs = {} config = config or {} if "_id" not in config: config |= {"_id": f"{self.__class__.__name__}-{nanoid.generate(size=5)}"} self.__inputs = inputs self.__config = config self._reset_all_output_values() super().__init__(**config) if hasattr(self, "_trace_type"): self.trace_type = self._trace_type if not hasattr(self, "trace_type"): self.trace_type = "chain" if self.inputs is not None: self.map_inputs(self.inputs) if self.outputs is not None: self.map_outputs(self.outputs) # Set output types self._set_output_types(list(self._outputs_map.values())) self.set_class_code() self._set_output_required_inputs() @property def ctx(self): if not hasattr(self, "graph") or self.graph is None: msg = "Graph not found. Please build the graph first." raise ValueError(msg) return self.graph.context def add_to_ctx(self, key: str, value: Any, *, overwrite: bool = False) -> None: """Add a key-value pair to the context. Args: key (str): The key to add. value (Any): The value to associate with the key. overwrite (bool, optional): Whether to overwrite the existing value. Defaults to False. Raises: ValueError: If the graph is not built. """ if not hasattr(self, "graph") or self.graph is None: msg = "Graph not found. Please build the graph first." raise ValueError(msg) if key in self.graph.context and not overwrite: msg = f"Key {key} already exists in context. Set overwrite=True to overwrite." raise ValueError(msg) self.graph.context.update({key: value}) def update_ctx(self, value_dict: dict[str, Any]) -> None: """Update the context with a dictionary of values. Args: value_dict (dict[str, Any]): The dictionary of values to update. Raises: ValueError: If the graph is not built. """ if not hasattr(self, "graph") or self.graph is None: msg = "Graph not found. Please build the graph first." raise ValueError(msg) if not isinstance(value_dict, dict): msg = "Value dict must be a dictionary" raise TypeError(msg) self.graph.context.update(value_dict) def _pre_run_setup(self): pass def set_event_manager(self, event_manager: EventManager | None = None) -> None: self._event_manager = event_manager def _reset_all_output_values(self) -> None: if isinstance(self._outputs_map, dict): for output in self._outputs_map.values(): output.value = UNDEFINED def _build_state_model(self): if self._state_model: return self._state_model name = self.name or self.__class__.__name__ model_name = f"{name}StateModel" fields = {} for output in self._outputs_map.values(): fields[output.name] = getattr(self, output.method) self._state_model = create_state_model(model_name=model_name, **fields) return self._state_model def get_state_model_instance_getter(self): state_model = self._build_state_model() def _instance_getter(_): return state_model() _instance_getter.__annotations__["return"] = state_model return _instance_getter def __deepcopy__(self, memo: dict) -> Component: if id(self) in memo: return memo[id(self)] kwargs = deepcopy(self.__config, memo) kwargs["inputs"] = deepcopy(self.__inputs, memo) new_component = type(self)(**kwargs) new_component._code = self._code new_component._outputs_map = self._outputs_map new_component._inputs = self._inputs new_component._edges = self._edges new_component._components = self._components new_component._parameters = self._parameters new_component._attributes = self._attributes new_component._output_logs = self._output_logs new_component._logs = self._logs # type: ignore[attr-defined] memo[id(self)] = new_component return new_component def set_class_code(self) -> None: # Get the source code of the calling class if self._code: return try: module = inspect.getmodule(self.__class__) if module is None: msg = "Could not find module for class" raise ValueError(msg) class_code = inspect.getsource(module) self._code = class_code except OSError as e: msg = f"Could not find source code for {self.__class__.__name__}" raise ValueError(msg) from e def set(self, **kwargs): """Connects the component to other components or sets parameters and attributes. Args: **kwargs: Keyword arguments representing the connections, parameters, and attributes. Returns: None Raises: KeyError: If the specified input name does not exist. """ for key, value in kwargs.items(): self._process_connection_or_parameters(key, value) return self def list_inputs(self): """Returns a list of input names.""" return [_input.name for _input in self.inputs] def list_outputs(self): """Returns a list of output names.""" return [_output.name for _output in self._outputs_map.values()] async def run(self): """Executes the component's logic and returns the result. Returns: The result of executing the component's logic. """ return await self._run() def set_vertex(self, vertex: Vertex) -> None: """Sets the vertex for the component. Args: vertex (Vertex): The vertex to set. Returns: None """ self._vertex = vertex def get_input(self, name: str) -> Any: """Retrieves the value of the input with the specified name. Args: name (str): The name of the input. Returns: Any: The value of the input. Raises: ValueError: If the input with the specified name is not found. """ if name in self._inputs: return self._inputs[name] msg = f"Input {name} not found in {self.__class__.__name__}" raise ValueError(msg) def get_output(self, name: str) -> Any: """Retrieves the output with the specified name. Args: name (str): The name of the output to retrieve. Returns: Any: The output value. Raises: ValueError: If the output with the specified name is not found. """ if name in self._outputs_map: return self._outputs_map[name] msg = f"Output {name} not found in {self.__class__.__name__}" raise ValueError(msg) def set_on_output(self, name: str, **kwargs) -> None: output = self.get_output(name) for key, value in kwargs.items(): if not hasattr(output, key): msg = f"Output {name} does not have a method {key}" raise ValueError(msg) setattr(output, key, value) def set_output_value(self, name: str, value: Any) -> None: if name in self._outputs_map: self._outputs_map[name].value = value else: msg = f"Output {name} not found in {self.__class__.__name__}" raise ValueError(msg) def map_outputs(self, outputs: list[Output]) -> None: """Maps the given list of outputs to the component. Args: outputs (List[Output]): The list of outputs to be mapped. Raises: ValueError: If the output name is None. Returns: None """ for output in outputs: if output.name is None: msg = "Output name cannot be None." raise ValueError(msg) # Deepcopy is required to avoid modifying the original component; # allows each instance of each component to modify its own output self._outputs_map[output.name] = deepcopy(output) def map_inputs(self, inputs: list[InputTypes]) -> None: """Maps the given inputs to the component. Args: inputs (List[InputTypes]): A list of InputTypes objects representing the inputs. Raises: ValueError: If the input name is None. """ for input_ in inputs: if input_.name is None: msg = "Input name cannot be None." raise ValueError(msg) self._inputs[input_.name] = deepcopy(input_) def validate(self, params: dict) -> None: """Validates the component parameters. Args: params (dict): A dictionary containing the component parameters. Raises: ValueError: If the inputs are not valid. ValueError: If the outputs are not valid. """ self._validate_inputs(params) self._validate_outputs() def update_inputs( self, build_config: dotdict, field_value: Any, field_name: str | None = None, ): return self.update_build_config(build_config, field_value, field_name) def run_and_validate_update_outputs(self, frontend_node: dict, field_name: str, field_value: Any): frontend_node = self.update_outputs(frontend_node, field_name, field_value) if field_name == "tool_mode" or frontend_node.get("tool_mode"): is_tool_mode = field_value or frontend_node.get("tool_mode") frontend_node["outputs"] = [self._build_tool_output()] if is_tool_mode else frontend_node["outputs"] if is_tool_mode: frontend_node.setdefault("template", {}) frontend_node["template"][TOOLS_METADATA_INPUT_NAME] = self._build_tools_metadata_input().to_dict() elif "template" in frontend_node: frontend_node["template"].pop(TOOLS_METADATA_INPUT_NAME, None) self.tools_metadata = frontend_node.get("template", {}).get(TOOLS_METADATA_INPUT_NAME, {}).get("value") return self._validate_frontend_node(frontend_node) def _validate_frontend_node(self, frontend_node: dict): # Check if all outputs are either Output or a valid Output model for index, output in enumerate(frontend_node["outputs"]): if isinstance(output, dict): try: output_ = Output(**output) self._set_output_return_type(output_) output_dict = output_.model_dump() except ValidationError as e: msg = f"Invalid output: {e}" raise ValueError(msg) from e elif isinstance(output, Output): # we need to serialize it self._set_output_return_type(output) output_dict = output.model_dump() else: msg = f"Invalid output type: {type(output)}" raise TypeError(msg) frontend_node["outputs"][index] = output_dict return frontend_node def update_outputs(self, frontend_node: dict, field_name: str, field_value: Any) -> dict: # noqa: ARG002 """Default implementation for updating outputs based on field changes. Subclasses can override this to modify outputs based on field_name and field_value. """ return frontend_node def _set_output_types(self, outputs: list[Output]) -> None: for output in outputs: self._set_output_return_type(output) def _set_output_return_type(self, output: Output) -> None: if output.method is None: msg = f"Output {output.name} does not have a method" raise ValueError(msg) return_types = self._get_method_return_type(output.method) output.add_types(return_types) output.set_selected() def _set_output_required_inputs(self) -> None: for output in self.outputs: if not output.method: continue method = getattr(self, output.method, None) if not method or not callable(method): continue try: source_code = inspect.getsource(method) ast_tree = ast.parse(dedent(source_code)) except Exception: # noqa: BLE001 ast_tree = ast.parse(dedent(self._code or "")) visitor = RequiredInputsVisitor(self._inputs) visitor.visit(ast_tree) output.required_inputs = sorted(visitor.required_inputs) def get_output_by_method(self, method: Callable): # method is a callable and output.method is a string # we need to find the output that has the same method output = next((output for output in self._outputs_map.values() if output.method == method.__name__), None) if output is None: method_name = method.__name__ if hasattr(method, "__name__") else str(method) msg = f"Output with method {method_name} not found" raise ValueError(msg) return output def _inherits_from_component(self, method: Callable): # check if the method is a method from a class that inherits from Component # and that it is an output of that class return hasattr(method, "__self__") and isinstance(method.__self__, Component) def _method_is_valid_output(self, method: Callable): # check if the method is a method from a class that inherits from Component # and that it is an output of that class return ( hasattr(method, "__self__") and isinstance(method.__self__, Component) and method.__self__.get_output_by_method(method) ) def _build_error_string_from_matching_pairs(self, matching_pairs: list[tuple[Output, Input]]): text = "" for output, input_ in matching_pairs: text += f"{output.name}[{','.join(output.types)}]->{input_.name}[{','.join(input_.input_types or [])}]\n" return text def _find_matching_output_method(self, input_name: str, value: Component): """Find the output method from the given component and input name. Find the output method from the given component (`value`) that matches the specified input (`input_name`) in the current component. This method searches through all outputs of the provided component to find outputs whose types match the input types of the specified input in the current component. If exactly one matching output is found, it returns the corresponding method. If multiple matching outputs are found, it raises an error indicating ambiguity. If no matching outputs are found, it raises an error indicating that no suitable output was found. Args: input_name (str): The name of the input in the current component to match. value (Component): The component whose outputs are to be considered. Returns: Callable: The method corresponding to the matching output. Raises: ValueError: If multiple matching outputs are found, if no matching outputs are found, or if the output method is invalid. """ # Retrieve all outputs from the given component outputs = value._outputs_map.values() # Prepare to collect matching output-input pairs matching_pairs = [] # Get the input object from the current component input_ = self._inputs[input_name] # Iterate over outputs to find matches based on types matching_pairs = [ (output, input_) for output in outputs for output_type in output.types # Check if the output type matches the input's accepted types if input_.input_types and output_type in input_.input_types ] # If multiple matches are found, raise an error indicating ambiguity if len(matching_pairs) > 1: matching_pairs_str = self._build_error_string_from_matching_pairs(matching_pairs) msg = ( f"There are multiple outputs from {value.__class__.__name__} " f"that can connect to inputs in {self.__class__.__name__}: {matching_pairs_str}" ) # If no matches are found, raise an error indicating no suitable output if not matching_pairs: msg = ( f"No matching output from {value.__class__.__name__} found for input '{input_name}' " f"in {self.__class__.__name__}." ) raise ValueError(msg) # Get the matching output and input pair output, input_ = matching_pairs[0] # Ensure that the output method is a valid method name (string) if not isinstance(output.method, str): msg = f"Method {output.method} is not a valid output of {value.__class__.__name__}" raise TypeError(msg) return getattr(value, output.method) def _process_connection_or_parameter(self, key, value) -> None: input_ = self._get_or_create_input(key) # We need to check if callable AND if it is a method from a class that inherits from Component if isinstance(value, Component): # We need to find the Output that can connect to an input of the current component # if there's more than one output that matches, we need to raise an error # because we don't know which one to connect to value = self._find_matching_output_method(key, value) if callable(value) and self._inherits_from_component(value): try: self._method_is_valid_output(value) except ValueError as e: msg = f"Method {value.__name__} is not a valid output of {value.__self__.__class__.__name__}" raise ValueError(msg) from e self._connect_to_component(key, value, input_) else: self._set_parameter_or_attribute(key, value) def _process_connection_or_parameters(self, key, value) -> None: # if value is a list of components, we need to process each component # Note this update make sure it is not a list str | int | float | bool | type(None) if isinstance(value, list) and not any( isinstance(val, str | int | float | bool | type(None) | Message | Data | StructuredTool) for val in value ): for val in value: self._process_connection_or_parameter(key, val) else: self._process_connection_or_parameter(key, value) def _get_or_create_input(self, key): try: return self._inputs[key] except KeyError: input_ = self._get_fallback_input(name=key, display_name=key) self._inputs[key] = input_ self.inputs.append(input_) return input_ def _connect_to_component(self, key, value, input_) -> None: component = value.__self__ self._components.append(component) output = component.get_output_by_method(value) self._add_edge(component, key, output, input_) def _add_edge(self, component, key, output, input_) -> None: self._edges.append( { "source": component._id, "target": self._id, "data": { "sourceHandle": { "dataType": component.name or component.__class__.__name__, "id": component._id, "name": output.name, "output_types": output.types, }, "targetHandle": { "fieldName": key, "id": self._id, "inputTypes": input_.input_types, "type": input_.field_type, }, }, } ) def _set_parameter_or_attribute(self, key, value) -> None: if isinstance(value, Component): methods = ", ".join([f"'{output.method}'" for output in value.outputs]) msg = ( f"You set {value.display_name} as value for `{key}`. " f"You should pass one of the following: {methods}" ) raise TypeError(msg) self._set_input_value(key, value) self._parameters[key] = value self._attributes[key] = value def __call__(self, **kwargs): self.set(**kwargs) return run_until_complete(self.run()) async def _run(self): # Resolve callable inputs for key, _input in self._inputs.items(): if asyncio.iscoroutinefunction(_input.value): self._inputs[key].value = await _input.value() elif callable(_input.value): self._inputs[key].value = await asyncio.to_thread(_input.value) self.set_attributes({}) return await self.build_results() def __getattr__(self, name: str) -> Any: if "_attributes" in self.__dict__ and name in self.__dict__["_attributes"]: return self.__dict__["_attributes"][name] if "_inputs" in self.__dict__ and name in self.__dict__["_inputs"]: return self.__dict__["_inputs"][name].value if "_outputs_map" in self.__dict__ and name in self.__dict__["_outputs_map"]: return self.__dict__["_outputs_map"][name] if name in BACKWARDS_COMPATIBLE_ATTRIBUTES: return self.__dict__[f"_{name}"] if name.startswith("_") and name[1:] in BACKWARDS_COMPATIBLE_ATTRIBUTES: return self.__dict__[name] if name == "graph": # If it got up to here it means it was going to raise session_id = self._session_id if hasattr(self, "_session_id") else None user_id = self._user_id if hasattr(self, "_user_id") else None flow_name = self._flow_name if hasattr(self, "_flow_name") else None flow_id = self._flow_id if hasattr(self, "_flow_id") else None return PlaceholderGraph( flow_id=flow_id, user_id=str(user_id), session_id=session_id, context={}, flow_name=flow_name ) msg = f"{name} not found in {self.__class__.__name__}" raise AttributeError(msg) def _set_input_value(self, name: str, value: Any) -> None: if name in self._inputs: input_value = self._inputs[name].value if isinstance(input_value, Component): methods = ", ".join([f"'{output.method}'" for output in input_value.outputs]) msg = ( f"You set {input_value.display_name} as value for `{name}`. " f"You should pass one of the following: {methods}" ) raise ValueError(msg) if callable(input_value) and hasattr(input_value, "__self__"): msg = f"Input {name} is connected to {input_value.__self__.display_name}.{input_value.__name__}" raise ValueError(msg) self._inputs[name].value = value if hasattr(self._inputs[name], "load_from_db"): self._inputs[name].load_from_db = False else: msg = f"Input {name} not found in {self.__class__.__name__}" raise ValueError(msg) def _validate_outputs(self) -> None: # Raise Error if some rule isn't met pass def _map_parameters_on_frontend_node(self, frontend_node: ComponentFrontendNode) -> None: for name, value in self._parameters.items(): frontend_node.set_field_value_in_template(name, value) def _map_parameters_on_template(self, template: dict) -> None: for name, value in self._parameters.items(): try: template[name]["value"] = value except KeyError as e: close_match = find_closest_match(name, list(template.keys())) if close_match: msg = f"Parameter '{name}' not found in {self.__class__.__name__}. Did you mean '{close_match}'?" raise ValueError(msg) from e msg = f"Parameter {name} not found in {self.__class__.__name__}. " raise ValueError(msg) from e def _get_method_return_type(self, method_name: str) -> list[str]: method = getattr(self, method_name) return_type = get_type_hints(method)["return"] extracted_return_types = self._extract_return_type(return_type) return [format_type(extracted_return_type) for extracted_return_type in extracted_return_types] def _update_template(self, frontend_node: dict): return frontend_node def to_frontend_node(self): # ! This part here is clunky but we need it like this for # ! backwards compatibility. We can change how prompt component # ! works and then update this later field_config = self.get_template_config(self) frontend_node = ComponentFrontendNode.from_inputs(**field_config) for key in self._inputs: frontend_node.set_field_load_from_db_in_template(key, value=False) self._map_parameters_on_frontend_node(frontend_node) frontend_node_dict = frontend_node.to_dict(keep_name=False) frontend_node_dict = self._update_template(frontend_node_dict) self._map_parameters_on_template(frontend_node_dict["template"]) frontend_node = ComponentFrontendNode.from_dict(frontend_node_dict) if not self._code: self.set_class_code() code_field = Input( dynamic=True, required=True, placeholder="", multiline=True, value=self._code, password=False, name="code", advanced=True, field_type="code", is_list=False, ) frontend_node.template.add_field(code_field) for output in frontend_node.outputs: if output.types: continue return_types = self._get_method_return_type(output.method) output.add_types(return_types) output.set_selected() frontend_node.validate_component() frontend_node.set_base_classes_from_outputs() return { "data": { "node": frontend_node.to_dict(keep_name=False), "type": self.name or self.__class__.__name__, "id": self._id, }, "id": self._id, } def _validate_inputs(self, params: dict) -> None: # Params keys are the `name` attribute of the Input objects for key, value in params.copy().items(): if key not in self._inputs: continue input_ = self._inputs[key] # BaseInputMixin has a `validate_assignment=True` input_.value = value params[input_.name] = input_.value def set_attributes(self, params: dict) -> None: self._validate_inputs(params) attributes = {} for key, value in params.items(): if key in self.__dict__ and value != getattr(self, key): msg = ( f"{self.__class__.__name__} defines an input parameter named '{key}' " f"that is a reserved word and cannot be used." ) raise ValueError(msg) attributes[key] = value for key, input_obj in self._inputs.items(): if key not in attributes and key not in self._attributes: attributes[key] = input_obj.value or None self._attributes.update(attributes) def _set_outputs(self, outputs: list[dict]) -> None: self.outputs = [Output(**output) for output in outputs] for output in self.outputs: setattr(self, output.name, output) self._outputs_map[output.name] = output def get_trace_as_inputs(self): predefined_inputs = { input_.name: input_.value for input_ in self.inputs if hasattr(input_, "trace_as_input") and input_.trace_as_input } # Runtime inputs runtime_inputs = {name: input_.value for name, input_ in self._inputs.items() if hasattr(input_, "value")} return {**predefined_inputs, **runtime_inputs} def get_trace_as_metadata(self): return { input_.name: input_.value for input_ in self.inputs if hasattr(input_, "trace_as_metadata") and input_.trace_as_metadata } async def _build_with_tracing(self): inputs = self.get_trace_as_inputs() metadata = self.get_trace_as_metadata() async with self._tracing_service.trace_context(self, self.trace_name, inputs, metadata): results, artifacts = await self._build_results() self._tracing_service.set_outputs(self.trace_name, results) return results, artifacts async def _build_without_tracing(self): return await self._build_results() async def build_results(self): """Build the results of the component.""" if hasattr(self, "graph"): session_id = self.graph.session_id elif hasattr(self, "_session_id"): session_id = self._session_id else: session_id = None try: if self._tracing_service: return await self._build_with_tracing() return await self._build_without_tracing() except StreamingError as e: await self.send_error( exception=e.cause, session_id=session_id, trace_name=getattr(self, "trace_name", None), source=e.source, ) raise e.cause # noqa: B904 except Exception as e: await self.send_error( exception=e, session_id=session_id, source=Source(id=self._id, display_name=self.display_name, source=self.display_name), trace_name=getattr(self, "trace_name", None), ) raise async def _build_results(self) -> tuple[dict, dict]: results = {} artifacts = {} if hasattr(self, "_pre_run_setup"): self._pre_run_setup() if hasattr(self, "outputs"): if any(getattr(_input, "tool_mode", False) for _input in self.inputs): self._append_tool_to_outputs_map() for output in self._outputs_map.values(): # Build the output if it's connected to some other vertex # or if it's not connected to any vertex if ( not self._vertex or not self._vertex.outgoing_edges or output.name in self._vertex.edges_source_names ): if output.method is None: msg = f"Output {output.name} does not have a method defined." raise ValueError(msg) self._current_output = output.name method: Callable = getattr(self, output.method) if output.cache and output.value != UNDEFINED: results[output.name] = output.value result = output.value else: # If the method is asynchronous, we need to await it if inspect.iscoroutinefunction(method): result = await method() else: result = await asyncio.to_thread(method) if ( self._vertex is not None and isinstance(result, Message) and result.flow_id is None and self._vertex.graph.flow_id is not None ): result.set_flow_id(self._vertex.graph.flow_id) results[output.name] = result output.value = result custom_repr = self.custom_repr() if custom_repr is None and isinstance(result, dict | Data | str): custom_repr = result if not isinstance(custom_repr, str): custom_repr = str(custom_repr) raw = result if self.status is None: artifact_value = raw else: artifact_value = self.status raw = self.status if hasattr(raw, "data") and raw is not None: raw = raw.data if raw is None: raw = custom_repr elif hasattr(raw, "model_dump") and raw is not None: raw = raw.model_dump() if raw is None and isinstance(result, dict | Data | str): raw = result.data if isinstance(result, Data) else result artifact_type = get_artifact_type(artifact_value, result) raw, artifact_type = post_process_raw(raw, artifact_type) artifact = {"repr": custom_repr, "raw": raw, "type": artifact_type} artifacts[output.name] = artifact self._output_logs[output.name] = self._logs self._logs = [] self._current_output = "" self._artifacts = artifacts self._results = results if self._tracing_service: self._tracing_service.set_outputs(self.trace_name, results) return results, artifacts def custom_repr(self): if self.repr_value == "": self.repr_value = self.status if isinstance(self.repr_value, dict): return yaml.dump(self.repr_value) if isinstance(self.repr_value, str): return self.repr_value if isinstance(self.repr_value, BaseModel) and not isinstance(self.repr_value, Data): return str(self.repr_value) return self.repr_value def build_inputs(self): """Builds the inputs for the custom component. Returns: List[Input]: The list of inputs. """ # This function is similar to build_config, but it will process the inputs # and return them as a dict with keys being the Input.name and values being the Input.model_dump() self.inputs = self.template_config.get("inputs", []) if not self.inputs: return {} return {_input.name: _input.model_dump(by_alias=True, exclude_none=True) for _input in self.inputs} def _get_field_order(self): try: inputs = self.template_config["inputs"] return [field.name for field in inputs] except KeyError: return [] def build(self, **kwargs) -> None: self.set_attributes(kwargs) def _get_fallback_input(self, **kwargs): return Input(**kwargs) def to_toolkit(self) -> list[Tool]: component_toolkit = _get_component_toolkit() tools = component_toolkit(component=self).get_tools(callbacks=self.get_langchain_callbacks()) if hasattr(self, TOOLS_METADATA_INPUT_NAME): tools = component_toolkit(component=self, metadata=self.tools_metadata).update_tools_metadata(tools=tools) return tools def get_project_name(self): if hasattr(self, "_tracing_service") and self._tracing_service: return self._tracing_service.project_name return "Langflow" def log(self, message: LoggableType | list[LoggableType], name: str | None = None) -> None: """Logs a message. Args: message (LoggableType | list[LoggableType]): The message to log. name (str, optional): The name of the log. Defaults to None. """ if name is None: name = f"Log {len(self._logs) + 1}" log = Log(message=message, type=get_artifact_type(message), name=name) self._logs.append(log) if self._tracing_service and self._vertex: self._tracing_service.add_log(trace_name=self.trace_name, log=log) if self._event_manager is not None and self._current_output: data = log.model_dump() data["output"] = self._current_output data["component_id"] = self._id self._event_manager.on_log(data=data) def _append_tool_output(self) -> None: if next((output for output in self.outputs if output.name == TOOL_OUTPUT_NAME), None) is None: self.outputs.append( Output( name=TOOL_OUTPUT_NAME, display_name=TOOL_OUTPUT_DISPLAY_NAME, method="to_toolkit", types=["Tool"], ) ) async def send_message(self, message: Message, id_: str | None = None): if (hasattr(self, "graph") and self.graph.session_id) and (message is not None and not message.session_id): message.session_id = self.graph.session_id stored_message = await self._store_message(message) self._stored_message_id = stored_message.id try: complete_message = "" if ( self._should_stream_message(stored_message, message) and message is not None and isinstance(message.text, AsyncIterator | Iterator) ): complete_message = await self._stream_message(message.text, stored_message) stored_message.text = complete_message stored_message = await self._update_stored_message(stored_message) else: # Only send message event for non-streaming messages self._send_message_event(stored_message, id_=id_) except Exception: # remove the message from the database await delete_message(stored_message.id) raise self.status = stored_message return stored_message async def _store_message(self, message: Message) -> Message: flow_id = self.graph.flow_id if hasattr(self, "graph") else None messages = await astore_message(message, flow_id=flow_id) if len(messages) != 1: msg = "Only one message can be stored at a time." raise ValueError(msg) return messages[0] def _send_message_event(self, message: Message, id_: str | None = None, category: str | None = None) -> None: if hasattr(self, "_event_manager") and self._event_manager: data_dict = message.data.copy() if hasattr(message, "data") else message.model_dump() if id_ and not data_dict.get("id"): data_dict["id"] = id_ category = category or data_dict.get("category", None) match category: case "error": self._event_manager.on_error(data=data_dict) case "remove_message": self._event_manager.on_remove_message(data={"id": data_dict["id"]}) case _: self._event_manager.on_message(data=data_dict) def _should_stream_message(self, stored_message: Message, original_message: Message) -> bool: return bool( hasattr(self, "_event_manager") and self._event_manager and stored_message.id and not isinstance(original_message.text, str) ) async def _update_stored_message(self, stored_message: Message) -> Message: message_tables = await aupdate_messages(stored_message) if len(message_tables) != 1: msg = "Only one message can be updated at a time." raise ValueError(msg) message_table = message_tables[0] return await Message.create(**message_table.model_dump()) async def _stream_message(self, iterator: AsyncIterator | Iterator, message: Message) -> str: if not isinstance(iterator, AsyncIterator | Iterator): msg = "The message must be an iterator or an async iterator." raise TypeError(msg) if isinstance(iterator, AsyncIterator): return await self._handle_async_iterator(iterator, message.id, message) try: complete_message = "" first_chunk = True for chunk in iterator: complete_message = self._process_chunk( chunk.content, complete_message, message.id, message, first_chunk=first_chunk ) first_chunk = False except Exception as e: raise StreamingError(cause=e, source=message.properties.source) from e else: return complete_message async def _handle_async_iterator(self, iterator: AsyncIterator, message_id: str, message: Message) -> str: complete_message = "" first_chunk = True async for chunk in iterator: complete_message = self._process_chunk( chunk.content, complete_message, message_id, message, first_chunk=first_chunk ) first_chunk = False return complete_message def _process_chunk( self, chunk: str, complete_message: str, message_id: str, message: Message, *, first_chunk: bool = False ) -> str: complete_message += chunk if self._event_manager: if first_chunk: # Send the initial message only on the first chunk msg_copy = message.model_copy() msg_copy.text = complete_message self._send_message_event(msg_copy, id_=message_id) self._event_manager.on_token( data={ "chunk": chunk, "id": str(message_id), } ) return complete_message async def send_error( self, exception: Exception, session_id: str, trace_name: str, source: Source, ) -> Message: """Send an error message to the frontend.""" flow_id = self.graph.flow_id if hasattr(self, "graph") else None error_message = ErrorMessage( flow_id=flow_id, exception=exception, session_id=session_id, trace_name=trace_name, source=source, ) await self.send_message(error_message) return error_message def _append_tool_to_outputs_map(self): self._outputs_map[TOOL_OUTPUT_NAME] = self._build_tool_output() # add a new input for the tool schema # self.inputs.append(self._build_tool_schema()) def _build_tool_output(self) -> Output: return Output(name=TOOL_OUTPUT_NAME, display_name=TOOL_OUTPUT_DISPLAY_NAME, method="to_toolkit", types=["Tool"]) def _build_tools_metadata_input(self): tools = self.to_toolkit() tool_data = ( self.tools_metadata if hasattr(self, TOOLS_METADATA_INPUT_NAME) else [{"name": tool.name, "description": tool.description} for tool in tools] ) try: from langflow.io import TableInput except ImportError as e: msg = "Failed to import TableInput from langflow.io" raise ImportError(msg) from e return TableInput( name=TOOLS_METADATA_INPUT_NAME, display_name="Tools Metadata", real_time_refresh=True, table_schema=TOOL_TABLE_SCHEMA, value=tool_data, )