Spaces:
Running
Running
import ast | |
import contextlib | |
import re | |
import traceback | |
from typing import Any | |
from uuid import UUID | |
from fastapi import HTTPException | |
from loguru import logger | |
from pydantic import BaseModel | |
from langflow.custom import CustomComponent | |
from langflow.custom.custom_component.component import Component | |
from langflow.custom.directory_reader.utils import ( | |
abuild_custom_component_list_from_path, | |
build_custom_component_list_from_path, | |
merge_nested_dicts_with_renaming, | |
) | |
from langflow.custom.eval import eval_custom_component_code | |
from langflow.custom.schema import MissingDefault | |
from langflow.field_typing.range_spec import RangeSpec | |
from langflow.helpers.custom import format_type | |
from langflow.schema import dotdict | |
from langflow.template.field.base import Input | |
from langflow.template.frontend_node.custom_components import ComponentFrontendNode, CustomComponentFrontendNode | |
from langflow.type_extraction.type_extraction import extract_inner_type | |
from langflow.utils import validate | |
from langflow.utils.util import get_base_classes | |
class UpdateBuildConfigError(Exception): | |
pass | |
def add_output_types(frontend_node: CustomComponentFrontendNode, return_types: list[str]) -> None: | |
"""Add output types to the frontend node.""" | |
for return_type in return_types: | |
if return_type is None: | |
raise HTTPException( | |
status_code=400, | |
detail={ | |
"error": ("Invalid return type. Please check your code and try again."), | |
"traceback": traceback.format_exc(), | |
}, | |
) | |
if return_type is str: | |
return_type_ = "Text" | |
elif hasattr(return_type, "__name__"): | |
return_type_ = return_type.__name__ | |
elif hasattr(return_type, "__class__"): | |
return_type_ = return_type.__class__.__name__ | |
else: | |
return_type_ = str(return_type) | |
frontend_node.add_output_type(return_type_) | |
def reorder_fields(frontend_node: CustomComponentFrontendNode, field_order: list[str]) -> None: | |
"""Reorder fields in the frontend node based on the specified field_order.""" | |
if not field_order: | |
return | |
# Create a dictionary for O(1) lookup time. | |
field_dict = {field.name: field for field in frontend_node.template.fields} | |
reordered_fields = [field_dict[name] for name in field_order if name in field_dict] | |
# Add any fields that are not in the field_order list | |
reordered_fields.extend(field for field in frontend_node.template.fields if field.name not in field_order) | |
frontend_node.template.fields = reordered_fields | |
frontend_node.field_order = field_order | |
def add_base_classes(frontend_node: CustomComponentFrontendNode, return_types: list[str]) -> None: | |
"""Add base classes to the frontend node.""" | |
for return_type_instance in return_types: | |
if return_type_instance is None: | |
raise HTTPException( | |
status_code=400, | |
detail={ | |
"error": ("Invalid return type. Please check your code and try again."), | |
"traceback": traceback.format_exc(), | |
}, | |
) | |
base_classes = get_base_classes(return_type_instance) | |
if return_type_instance is str: | |
base_classes.append("Text") | |
for base_class in base_classes: | |
frontend_node.add_base_class(base_class) | |
def extract_type_from_optional(field_type): | |
"""Extract the type from a string formatted as "Optional[<type>]". | |
Parameters: | |
field_type (str): The string from which to extract the type. | |
Returns: | |
str: The extracted type, or an empty string if no type was found. | |
""" | |
if "optional" not in field_type.lower(): | |
return field_type | |
match = re.search(r"\[(.*?)\]$", field_type) | |
return match[1] if match else field_type | |
def get_field_properties(extra_field): | |
"""Get the properties of an extra field.""" | |
field_name = extra_field["name"] | |
field_type = extra_field.get("type", "str") | |
field_value = extra_field.get("default", "") | |
# a required field is a field that does not contain | |
# optional in field_type | |
# and a field that does not have a default value | |
field_required = "optional" not in field_type.lower() and isinstance(field_value, MissingDefault) | |
field_value = field_value if not isinstance(field_value, MissingDefault) else None | |
if not field_required: | |
field_type = extract_type_from_optional(field_type) | |
if field_value is not None: | |
with contextlib.suppress(Exception): | |
field_value = ast.literal_eval(field_value) | |
return field_name, field_type, field_value, field_required | |
def process_type(field_type: str): | |
if field_type.startswith(("list", "List")): | |
return extract_inner_type(field_type) | |
# field_type is a string can be Prompt or Code too | |
# so we just need to lower if it is the case | |
lowercase_type = field_type.lower() | |
if lowercase_type in {"prompt", "code"}: | |
return lowercase_type | |
return field_type | |
def add_new_custom_field( | |
*, | |
frontend_node: CustomComponentFrontendNode, | |
field_name: str, | |
field_type: str, | |
field_value: Any, | |
field_required: bool, | |
field_config: dict, | |
): | |
# Check field_config if any of the keys are in it | |
# if it is, update the value | |
display_name = field_config.pop("display_name", None) | |
if not field_type: | |
if "type" in field_config and field_config["type"] is not None: | |
field_type = field_config.pop("type") | |
elif "field_type" in field_config and field_config["field_type"] is not None: | |
field_type = field_config.pop("field_type") | |
field_contains_list = "list" in field_type.lower() | |
field_type = process_type(field_type) | |
field_value = field_config.pop("value", field_value) | |
field_advanced = field_config.pop("advanced", False) | |
if field_type == "Dict": | |
field_type = "dict" | |
if field_type == "bool" and field_value is None: | |
field_value = False | |
if field_type == "SecretStr": | |
field_config["password"] = True | |
field_config["load_from_db"] = True | |
field_config["input_types"] = ["Text"] | |
# If options is a list, then it's a dropdown or multiselect | |
# If options is None, then it's a list of strings | |
is_list = isinstance(field_config.get("options"), list) | |
field_config["is_list"] = is_list or field_config.get("list", False) or field_contains_list | |
if "name" in field_config: | |
logger.warning("The 'name' key in field_config is used to build the object and can't be changed.") | |
required = field_config.pop("required", field_required) | |
placeholder = field_config.pop("placeholder", "") | |
new_field = Input( | |
name=field_name, | |
field_type=field_type, | |
value=field_value, | |
show=True, | |
required=required, | |
advanced=field_advanced, | |
placeholder=placeholder, | |
display_name=display_name, | |
**sanitize_field_config(field_config), | |
) | |
frontend_node.template.upsert_field(field_name, new_field) | |
if isinstance(frontend_node.custom_fields, dict): | |
frontend_node.custom_fields[field_name] = None | |
return frontend_node | |
def add_extra_fields(frontend_node, field_config, function_args) -> None: | |
"""Add extra fields to the frontend node.""" | |
if not function_args: | |
return | |
field_config_ = field_config.copy() | |
function_args_names = [arg["name"] for arg in function_args] | |
# If kwargs is in the function_args and not all field_config keys are in function_args | |
# then we need to add the extra fields | |
for extra_field in function_args: | |
if "name" not in extra_field or extra_field["name"] in { | |
"self", | |
"kwargs", | |
"args", | |
}: | |
continue | |
field_name, field_type, field_value, field_required = get_field_properties(extra_field) | |
config = field_config_.pop(field_name, {}) | |
frontend_node = add_new_custom_field( | |
frontend_node=frontend_node, | |
field_name=field_name, | |
field_type=field_type, | |
field_value=field_value, | |
field_required=field_required, | |
field_config=config, | |
) | |
if "kwargs" in function_args_names and not all(key in function_args_names for key in field_config): | |
for field_name, config in field_config_.items(): | |
if "name" not in config or field_name == "code": | |
continue | |
config_ = config.model_dump() if isinstance(config, BaseModel) else config | |
field_name_, field_type, field_value, field_required = get_field_properties(extra_field=config_) | |
frontend_node = add_new_custom_field( | |
frontend_node=frontend_node, | |
field_name=field_name_, | |
field_type=field_type, | |
field_value=field_value, | |
field_required=field_required, | |
field_config=config_, | |
) | |
def get_field_dict(field: Input | dict): | |
"""Get the field dictionary from a Input or a dict.""" | |
if isinstance(field, Input): | |
return dotdict(field.model_dump(by_alias=True, exclude_none=True)) | |
return field | |
def run_build_inputs( | |
custom_component: Component, | |
): | |
"""Run the build inputs of a custom component.""" | |
try: | |
return custom_component.build_inputs() | |
# add_extra_fields(frontend_node, field_config, field_config.values()) | |
except Exception as exc: | |
logger.exception("Error running build inputs") | |
raise HTTPException(status_code=500, detail=str(exc)) from exc | |
def get_component_instance(custom_component: CustomComponent, user_id: str | UUID | None = None): | |
if custom_component._code is None: | |
error = "Code is None" | |
elif not isinstance(custom_component._code, str): | |
error = "Invalid code type" | |
else: | |
try: | |
custom_class = eval_custom_component_code(custom_component._code) | |
except Exception as exc: | |
logger.exception("Error while evaluating custom component code") | |
raise HTTPException( | |
status_code=400, | |
detail={ | |
"error": ("Invalid type conversion. Please check your code and try again."), | |
"traceback": traceback.format_exc(), | |
}, | |
) from exc | |
try: | |
return custom_class(_user_id=user_id, _code=custom_component._code) | |
except Exception as exc: | |
logger.exception("Error while instantiating custom component") | |
if hasattr(exc, "detail") and "traceback" in exc.detail: | |
logger.error(exc.detail["traceback"]) | |
raise | |
msg = f"Invalid type conversion: {error}. Please check your code and try again." | |
logger.error(msg) | |
raise HTTPException( | |
status_code=400, | |
detail={"error": msg}, | |
) | |
def run_build_config( | |
custom_component: CustomComponent, | |
user_id: str | UUID | None = None, | |
) -> tuple[dict, CustomComponent]: | |
"""Build the field configuration for a custom component.""" | |
if custom_component._code is None: | |
error = "Code is None" | |
elif not isinstance(custom_component._code, str): | |
error = "Invalid code type" | |
else: | |
try: | |
custom_class = eval_custom_component_code(custom_component._code) | |
except Exception as exc: | |
logger.exception("Error while evaluating custom component code") | |
raise HTTPException( | |
status_code=400, | |
detail={ | |
"error": ("Invalid type conversion. Please check your code and try again."), | |
"traceback": traceback.format_exc(), | |
}, | |
) from exc | |
try: | |
custom_instance = custom_class(_user_id=user_id) | |
build_config: dict = custom_instance.build_config() | |
for field_name, field in build_config.copy().items(): | |
# Allow user to build Input as well | |
# as a dict with the same keys as Input | |
field_dict = get_field_dict(field) | |
# Let's check if "rangeSpec" is a RangeSpec object | |
if "rangeSpec" in field_dict and isinstance(field_dict["rangeSpec"], RangeSpec): | |
field_dict["rangeSpec"] = field_dict["rangeSpec"].model_dump() | |
build_config[field_name] = field_dict | |
except Exception as exc: | |
logger.exception("Error while building field config") | |
if hasattr(exc, "detail") and "traceback" in exc.detail: | |
logger.error(exc.detail["traceback"]) | |
raise | |
return build_config, custom_instance | |
msg = f"Invalid type conversion: {error}. Please check your code and try again." | |
logger.error(msg) | |
raise HTTPException( | |
status_code=400, | |
detail={"error": msg}, | |
) | |
def add_code_field(frontend_node: CustomComponentFrontendNode, raw_code): | |
code_field = Input( | |
dynamic=True, | |
required=True, | |
placeholder="", | |
multiline=True, | |
value=raw_code, | |
password=False, | |
name="code", | |
advanced=True, | |
field_type="code", | |
is_list=False, | |
) | |
frontend_node.template.add_field(code_field) | |
return frontend_node | |
def build_custom_component_template_from_inputs( | |
custom_component: Component | CustomComponent, user_id: str | UUID | None = None | |
): | |
# The List of Inputs fills the role of the build_config and the entrypoint_args | |
cc_instance = get_component_instance(custom_component, user_id=user_id) | |
field_config = cc_instance.get_template_config(cc_instance) | |
frontend_node = ComponentFrontendNode.from_inputs(**field_config) | |
frontend_node = add_code_field(frontend_node, custom_component._code) | |
# But we now need to calculate the return_type of the methods in the outputs | |
for output in frontend_node.outputs: | |
if output.types: | |
continue | |
return_types = cc_instance.get_method_return_type(output.method) | |
return_types = [format_type(return_type) for return_type in return_types] | |
output.add_types(return_types) | |
output.set_selected() | |
# Validate that there is not name overlap between inputs and outputs | |
frontend_node.validate_component() | |
# ! This should be removed when we have a better way to handle this | |
frontend_node.set_base_classes_from_outputs() | |
reorder_fields(frontend_node, cc_instance._get_field_order()) | |
return frontend_node.to_dict(keep_name=False), cc_instance | |
def build_custom_component_template( | |
custom_component: CustomComponent, | |
user_id: str | UUID | None = None, | |
) -> tuple[dict[str, Any], CustomComponent | Component]: | |
"""Build a custom component template.""" | |
try: | |
has_template_config = hasattr(custom_component, "template_config") | |
except Exception as exc: | |
raise HTTPException( | |
status_code=400, | |
detail={ | |
"error": (f"Error building Component: {exc}"), | |
"traceback": traceback.format_exc(), | |
}, | |
) from exc | |
if not has_template_config: | |
raise HTTPException( | |
status_code=400, | |
detail={ | |
"error": ("Error building Component. Please check if you are importing Component correctly."), | |
}, | |
) | |
try: | |
if "inputs" in custom_component.template_config: | |
return build_custom_component_template_from_inputs(custom_component, user_id=user_id) | |
frontend_node = CustomComponentFrontendNode(**custom_component.template_config) | |
field_config, custom_instance = run_build_config( | |
custom_component, | |
user_id=user_id, | |
) | |
entrypoint_args = custom_component.get_function_entrypoint_args | |
add_extra_fields(frontend_node, field_config, entrypoint_args) | |
frontend_node = add_code_field(frontend_node, custom_component._code) | |
add_base_classes(frontend_node, custom_component._get_function_entrypoint_return_type) | |
add_output_types(frontend_node, custom_component._get_function_entrypoint_return_type) | |
reorder_fields(frontend_node, custom_instance._get_field_order()) | |
return frontend_node.to_dict(keep_name=False), custom_instance | |
except Exception as exc: | |
if isinstance(exc, HTTPException): | |
raise | |
raise HTTPException( | |
status_code=400, | |
detail={ | |
"error": (f"Error building Component: {exc}"), | |
"traceback": traceback.format_exc(), | |
}, | |
) from exc | |
def create_component_template(component): | |
"""Create a template for a component.""" | |
component_code = component["code"] | |
component_output_types = component["output_types"] | |
component_extractor = Component(_code=component_code) | |
component_template, component_instance = build_custom_component_template(component_extractor) | |
if not component_template["output_types"] and component_output_types: | |
component_template["output_types"] = component_output_types | |
return component_template, component_instance | |
def build_custom_components(components_paths: list[str]): | |
"""Build custom components from the specified paths.""" | |
if not components_paths: | |
return {} | |
logger.info(f"Building custom components from {components_paths}") | |
custom_components_from_file: dict = {} | |
processed_paths = set() | |
for path in components_paths: | |
path_str = str(path) | |
if path_str in processed_paths: | |
continue | |
custom_component_dict = build_custom_component_list_from_path(path_str) | |
if custom_component_dict: | |
category = next(iter(custom_component_dict)) | |
logger.info(f"Loading {len(custom_component_dict[category])} component(s) from category {category}") | |
custom_components_from_file = merge_nested_dicts_with_renaming( | |
custom_components_from_file, custom_component_dict | |
) | |
processed_paths.add(path_str) | |
return custom_components_from_file | |
async def abuild_custom_components(components_paths: list[str]): | |
"""Build custom components from the specified paths.""" | |
if not components_paths: | |
return {} | |
logger.info(f"Building custom components from {components_paths}") | |
custom_components_from_file: dict = {} | |
processed_paths = set() | |
for path in components_paths: | |
path_str = str(path) | |
if path_str in processed_paths: | |
continue | |
custom_component_dict = await abuild_custom_component_list_from_path(path_str) | |
if custom_component_dict: | |
category = next(iter(custom_component_dict)) | |
logger.info(f"Loading {len(custom_component_dict[category])} component(s) from category {category}") | |
custom_components_from_file = merge_nested_dicts_with_renaming( | |
custom_components_from_file, custom_component_dict | |
) | |
processed_paths.add(path_str) | |
return custom_components_from_file | |
def update_field_dict( | |
custom_component_instance: "CustomComponent", | |
field_dict: dict, | |
build_config: dict, | |
*, | |
update_field: str | None = None, | |
update_field_value: Any | None = None, | |
call: bool = False, | |
): | |
"""Update the field dictionary by calling options() or value() if they are callable.""" | |
if ( | |
("real_time_refresh" in field_dict or "refresh_button" in field_dict) | |
and any( | |
( | |
field_dict.get("real_time_refresh", False), | |
field_dict.get("refresh_button", False), | |
) | |
) | |
and call | |
): | |
try: | |
dd_build_config = dotdict(build_config) | |
custom_component_instance.update_build_config( | |
build_config=dd_build_config, | |
field_value=update_field, | |
field_name=update_field_value, | |
) | |
build_config = dd_build_config | |
except Exception as exc: | |
msg = f"Error while running update_build_config: {exc}" | |
logger.exception(msg) | |
raise UpdateBuildConfigError(msg) from exc | |
return build_config | |
def sanitize_field_config(field_config: dict | Input): | |
# If any of the already existing keys are in field_config, remove them | |
field_dict = field_config.to_dict() if isinstance(field_config, Input) else field_config | |
for key in [ | |
"name", | |
"field_type", | |
"value", | |
"required", | |
"placeholder", | |
"display_name", | |
"advanced", | |
"show", | |
]: | |
field_dict.pop(key, None) | |
# Remove field_type and type because they were extracted already | |
field_dict.pop("field_type", None) | |
field_dict.pop("type", None) | |
return field_dict | |
def build_component(component): | |
"""Build a single component.""" | |
component_template, component_instance = create_component_template(component) | |
component_name = get_instance_name(component_instance) | |
return component_name, component_template | |
def get_function(code): | |
"""Get the function.""" | |
function_name = validate.extract_function_name(code) | |
return validate.create_function(code, function_name) | |
def get_instance_name(instance): | |
name = instance.__class__.__name__ | |
if hasattr(instance, "name") and instance.name: | |
name = instance.name | |
return name | |