Tai Truong
fix readme
d202ada
from typing import TYPE_CHECKING, Any
from loguru import logger
from langflow.base.flow_processing.utils import build_data_from_result_data
from langflow.custom import CustomComponent
from langflow.graph.graph.base import Graph
from langflow.graph.vertex.base import Vertex
from langflow.helpers.flow import get_flow_inputs
from langflow.schema import Data
from langflow.schema.dotdict import dotdict
from langflow.template.field.base import Input
if TYPE_CHECKING:
from langflow.graph.schema import RunOutputs
class SubFlowComponent(CustomComponent):
display_name = "Sub Flow"
description = (
"Dynamically Generates a Component from a Flow. The output is a list of data with keys 'result' and 'message'."
)
beta: bool = True
field_order = ["flow_name"]
name = "SubFlow"
def get_flow_names(self) -> list[str]:
flow_datas = self.list_flows()
return [flow_data.data["name"] for flow_data in flow_datas]
def get_flow(self, flow_name: str) -> Data | None:
flow_datas = self.list_flows()
for flow_data in flow_datas:
if flow_data.data["name"] == flow_name:
return flow_data
return None
def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None):
logger.debug(f"Updating build config with field value {field_value} and field name {field_name}")
if field_name == "flow_name":
build_config["flow_name"]["options"] = self.get_flow_names()
# Clean up the build config
for key in list(build_config.keys()):
if key not in {*self.field_order, "code", "_type", "get_final_results_only"}:
del build_config[key]
if field_value is not None and field_name == "flow_name":
try:
flow_data = self.get_flow(field_value)
except Exception: # noqa: BLE001
logger.exception(f"Error getting flow {field_value}")
else:
if not flow_data:
msg = f"Flow {field_value} not found."
logger.error(msg)
else:
try:
graph = Graph.from_payload(flow_data.data["data"])
# Get all inputs from the graph
inputs = get_flow_inputs(graph)
# Add inputs to the build config
build_config = self.add_inputs_to_build_config(inputs, build_config)
except Exception: # noqa: BLE001
logger.exception(f"Error building graph for flow {field_value}")
return build_config
def add_inputs_to_build_config(self, inputs: list[Vertex], build_config: dotdict):
new_fields: list[Input] = []
for vertex in inputs:
field = Input(
display_name=vertex.display_name,
name=vertex.id,
info=vertex.description,
field_type="str",
value=None,
)
new_fields.append(field)
logger.debug(new_fields)
for field in new_fields:
build_config[field.name] = field.to_dict()
return build_config
def build_config(self):
return {
"input_value": {
"display_name": "Input Value",
"multiline": True,
},
"flow_name": {
"display_name": "Flow Name",
"info": "The name of the flow to run.",
"options": [],
"real_time_refresh": True,
"refresh_button": True,
},
"tweaks": {
"display_name": "Tweaks",
"info": "Tweaks to apply to the flow.",
},
"get_final_results_only": {
"display_name": "Get Final Results Only",
"info": "If False, the output will contain all outputs from the flow.",
"advanced": True,
},
}
async def build(self, flow_name: str, **kwargs) -> list[Data]:
tweaks = {key: {"input_value": value} for key, value in kwargs.items()}
run_outputs: list[RunOutputs | None] = await self.run_flow(
tweaks=tweaks,
flow_name=flow_name,
)
if not run_outputs:
return []
run_output = run_outputs[0]
data = []
if run_output is not None:
for output in run_output.outputs:
if output:
data.extend(build_data_from_result_data(output))
self.status = data
logger.debug(data)
return data