Tai Truong
fix readme
d202ada
from typing import Any
from loguru import logger
from langflow.base.flow_processing.utils import build_data_from_result_data
from langflow.custom import Component
from langflow.graph.graph.base import Graph
from langflow.graph.vertex.base import Vertex
from langflow.helpers.flow import get_flow_inputs
from langflow.io import DropdownInput, Output
from langflow.schema import Data, dotdict
class SubFlowComponent(Component):
display_name = "Sub Flow"
description = "Generates a Component from a Flow, with all of its inputs, and "
name = "SubFlow"
beta: bool = True
icon = "Workflow"
def get_flow_names(self) -> list[str]:
flow_data = self.list_flows()
return [flow_data.data["name"] for flow_data in flow_data]
def get_flow(self, flow_name: str) -> Data | None:
flow_datas = self.list_flows()
for flow_data in flow_datas:
if flow_data.data["name"] == flow_name:
return flow_data
return None
def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None):
if field_name == "flow_name":
build_config["flow_name"]["options"] = self.get_flow_names()
for key in list(build_config.keys()):
if key not in [x.name for x in self.inputs] + ["code", "_type", "get_final_results_only"]:
del build_config[key]
if field_value is not None and field_name == "flow_name":
try:
flow_data = self.get_flow(field_value)
except Exception: # noqa: BLE001
logger.exception(f"Error getting flow {field_value}")
else:
if not flow_data:
msg = f"Flow {field_value} not found."
logger.error(msg)
else:
try:
graph = Graph.from_payload(flow_data.data["data"])
# Get all inputs from the graph
inputs = get_flow_inputs(graph)
# Add inputs to the build config
build_config = self.add_inputs_to_build_config(inputs, build_config)
except Exception: # noqa: BLE001
logger.exception(f"Error building graph for flow {field_value}")
return build_config
def add_inputs_to_build_config(self, inputs_vertex: list[Vertex], build_config: dotdict):
new_fields: list[dotdict] = []
for vertex in inputs_vertex:
new_vertex_inputs = []
field_template = vertex.data["node"]["template"]
for inp in field_template:
if inp not in {"code", "_type"}:
field_template[inp]["display_name"] = (
vertex.display_name + " - " + field_template[inp]["display_name"]
)
field_template[inp]["name"] = vertex.id + "|" + inp
new_vertex_inputs.append(field_template[inp])
new_fields += new_vertex_inputs
for field in new_fields:
build_config[field["name"]] = field
return build_config
inputs = [
DropdownInput(
name="flow_name",
display_name="Flow Name",
info="The name of the flow to run.",
options=[],
refresh_button=True,
real_time_refresh=True,
),
]
outputs = [Output(name="flow_outputs", display_name="Flow Outputs", method="generate_results")]
async def generate_results(self) -> list[Data]:
tweaks: dict = {}
for field in self._attributes:
if field != "flow_name" and "|" in field:
[node, name] = field.split("|")
if node not in tweaks:
tweaks[node] = {}
tweaks[node][name] = self._attributes[field]
flow_name = self._attributes.get("flow_name")
run_outputs = await self.run_flow(
tweaks=tweaks,
flow_name=flow_name,
output_type="all",
)
data: list[Data] = []
if not run_outputs:
return data
run_output = run_outputs[0]
if run_output is not None:
for output in run_output.outputs:
if output:
data.extend(build_data_from_result_data(output))
return data