Spaces:
Running
Running
from typing import Any | |
from loguru import logger | |
from langflow.base.flow_processing.utils import build_data_from_result_data | |
from langflow.custom import Component | |
from langflow.graph.graph.base import Graph | |
from langflow.graph.vertex.base import Vertex | |
from langflow.helpers.flow import get_flow_inputs | |
from langflow.io import DropdownInput, Output | |
from langflow.schema import Data, dotdict | |
class SubFlowComponent(Component): | |
display_name = "Sub Flow" | |
description = "Generates a Component from a Flow, with all of its inputs, and " | |
name = "SubFlow" | |
beta: bool = True | |
icon = "Workflow" | |
def get_flow_names(self) -> list[str]: | |
flow_data = self.list_flows() | |
return [flow_data.data["name"] for flow_data in flow_data] | |
def get_flow(self, flow_name: str) -> Data | None: | |
flow_datas = self.list_flows() | |
for flow_data in flow_datas: | |
if flow_data.data["name"] == flow_name: | |
return flow_data | |
return None | |
def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None): | |
if field_name == "flow_name": | |
build_config["flow_name"]["options"] = self.get_flow_names() | |
for key in list(build_config.keys()): | |
if key not in [x.name for x in self.inputs] + ["code", "_type", "get_final_results_only"]: | |
del build_config[key] | |
if field_value is not None and field_name == "flow_name": | |
try: | |
flow_data = self.get_flow(field_value) | |
except Exception: # noqa: BLE001 | |
logger.exception(f"Error getting flow {field_value}") | |
else: | |
if not flow_data: | |
msg = f"Flow {field_value} not found." | |
logger.error(msg) | |
else: | |
try: | |
graph = Graph.from_payload(flow_data.data["data"]) | |
# Get all inputs from the graph | |
inputs = get_flow_inputs(graph) | |
# Add inputs to the build config | |
build_config = self.add_inputs_to_build_config(inputs, build_config) | |
except Exception: # noqa: BLE001 | |
logger.exception(f"Error building graph for flow {field_value}") | |
return build_config | |
def add_inputs_to_build_config(self, inputs_vertex: list[Vertex], build_config: dotdict): | |
new_fields: list[dotdict] = [] | |
for vertex in inputs_vertex: | |
new_vertex_inputs = [] | |
field_template = vertex.data["node"]["template"] | |
for inp in field_template: | |
if inp not in {"code", "_type"}: | |
field_template[inp]["display_name"] = ( | |
vertex.display_name + " - " + field_template[inp]["display_name"] | |
) | |
field_template[inp]["name"] = vertex.id + "|" + inp | |
new_vertex_inputs.append(field_template[inp]) | |
new_fields += new_vertex_inputs | |
for field in new_fields: | |
build_config[field["name"]] = field | |
return build_config | |
inputs = [ | |
DropdownInput( | |
name="flow_name", | |
display_name="Flow Name", | |
info="The name of the flow to run.", | |
options=[], | |
refresh_button=True, | |
real_time_refresh=True, | |
), | |
] | |
outputs = [Output(name="flow_outputs", display_name="Flow Outputs", method="generate_results")] | |
async def generate_results(self) -> list[Data]: | |
tweaks: dict = {} | |
for field in self._attributes: | |
if field != "flow_name" and "|" in field: | |
[node, name] = field.split("|") | |
if node not in tweaks: | |
tweaks[node] = {} | |
tweaks[node][name] = self._attributes[field] | |
flow_name = self._attributes.get("flow_name") | |
run_outputs = await self.run_flow( | |
tweaks=tweaks, | |
flow_name=flow_name, | |
output_type="all", | |
) | |
data: list[Data] = [] | |
if not run_outputs: | |
return data | |
run_output = run_outputs[0] | |
if run_output is not None: | |
for output in run_output.outputs: | |
if output: | |
data.extend(build_data_from_result_data(output)) | |
return data | |