from typing import TYPE_CHECKING, Any from loguru import logger from langflow.base.flow_processing.utils import build_data_from_result_data from langflow.custom import CustomComponent from langflow.graph.graph.base import Graph from langflow.graph.vertex.base import Vertex from langflow.helpers.flow import get_flow_inputs from langflow.schema import Data from langflow.schema.dotdict import dotdict from langflow.template.field.base import Input if TYPE_CHECKING: from langflow.graph.schema import RunOutputs class SubFlowComponent(CustomComponent): display_name = "Sub Flow" description = ( "Dynamically Generates a Component from a Flow. The output is a list of data with keys 'result' and 'message'." ) beta: bool = True field_order = ["flow_name"] name = "SubFlow" def get_flow_names(self) -> list[str]: flow_datas = self.list_flows() return [flow_data.data["name"] for flow_data in flow_datas] def get_flow(self, flow_name: str) -> Data | None: flow_datas = self.list_flows() for flow_data in flow_datas: if flow_data.data["name"] == flow_name: return flow_data return None def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None): logger.debug(f"Updating build config with field value {field_value} and field name {field_name}") if field_name == "flow_name": build_config["flow_name"]["options"] = self.get_flow_names() # Clean up the build config for key in list(build_config.keys()): if key not in {*self.field_order, "code", "_type", "get_final_results_only"}: del build_config[key] if field_value is not None and field_name == "flow_name": try: flow_data = self.get_flow(field_value) except Exception: # noqa: BLE001 logger.exception(f"Error getting flow {field_value}") else: if not flow_data: msg = f"Flow {field_value} not found." logger.error(msg) else: try: graph = Graph.from_payload(flow_data.data["data"]) # Get all inputs from the graph inputs = get_flow_inputs(graph) # Add inputs to the build config build_config = self.add_inputs_to_build_config(inputs, build_config) except Exception: # noqa: BLE001 logger.exception(f"Error building graph for flow {field_value}") return build_config def add_inputs_to_build_config(self, inputs: list[Vertex], build_config: dotdict): new_fields: list[Input] = [] for vertex in inputs: field = Input( display_name=vertex.display_name, name=vertex.id, info=vertex.description, field_type="str", value=None, ) new_fields.append(field) logger.debug(new_fields) for field in new_fields: build_config[field.name] = field.to_dict() return build_config def build_config(self): return { "input_value": { "display_name": "Input Value", "multiline": True, }, "flow_name": { "display_name": "Flow Name", "info": "The name of the flow to run.", "options": [], "real_time_refresh": True, "refresh_button": True, }, "tweaks": { "display_name": "Tweaks", "info": "Tweaks to apply to the flow.", }, "get_final_results_only": { "display_name": "Get Final Results Only", "info": "If False, the output will contain all outputs from the flow.", "advanced": True, }, } async def build(self, flow_name: str, **kwargs) -> list[Data]: tweaks = {key: {"input_value": value} for key, value in kwargs.items()} run_outputs: list[RunOutputs | None] = await self.run_flow( tweaks=tweaks, flow_name=flow_name, ) if not run_outputs: return [] run_output = run_outputs[0] data = [] if run_output is not None: for output in run_output.outputs: if output: data.extend(build_data_from_result_data(output)) self.status = data logger.debug(data) return data