import ast import contextlib import re import traceback from typing import Any from uuid import UUID from fastapi import HTTPException from loguru import logger from pydantic import BaseModel from langflow.custom import CustomComponent from langflow.custom.custom_component.component import Component from langflow.custom.directory_reader.utils import ( abuild_custom_component_list_from_path, build_custom_component_list_from_path, merge_nested_dicts_with_renaming, ) from langflow.custom.eval import eval_custom_component_code from langflow.custom.schema import MissingDefault from langflow.field_typing.range_spec import RangeSpec from langflow.helpers.custom import format_type from langflow.schema import dotdict from langflow.template.field.base import Input from langflow.template.frontend_node.custom_components import ComponentFrontendNode, CustomComponentFrontendNode from langflow.type_extraction.type_extraction import extract_inner_type from langflow.utils import validate from langflow.utils.util import get_base_classes class UpdateBuildConfigError(Exception): pass def add_output_types(frontend_node: CustomComponentFrontendNode, return_types: list[str]) -> None: """Add output types to the frontend node.""" for return_type in return_types: if return_type is None: raise HTTPException( status_code=400, detail={ "error": ("Invalid return type. Please check your code and try again."), "traceback": traceback.format_exc(), }, ) if return_type is str: return_type_ = "Text" elif hasattr(return_type, "__name__"): return_type_ = return_type.__name__ elif hasattr(return_type, "__class__"): return_type_ = return_type.__class__.__name__ else: return_type_ = str(return_type) frontend_node.add_output_type(return_type_) def reorder_fields(frontend_node: CustomComponentFrontendNode, field_order: list[str]) -> None: """Reorder fields in the frontend node based on the specified field_order.""" if not field_order: return # Create a dictionary for O(1) lookup time. field_dict = {field.name: field for field in frontend_node.template.fields} reordered_fields = [field_dict[name] for name in field_order if name in field_dict] # Add any fields that are not in the field_order list reordered_fields.extend(field for field in frontend_node.template.fields if field.name not in field_order) frontend_node.template.fields = reordered_fields frontend_node.field_order = field_order def add_base_classes(frontend_node: CustomComponentFrontendNode, return_types: list[str]) -> None: """Add base classes to the frontend node.""" for return_type_instance in return_types: if return_type_instance is None: raise HTTPException( status_code=400, detail={ "error": ("Invalid return type. Please check your code and try again."), "traceback": traceback.format_exc(), }, ) base_classes = get_base_classes(return_type_instance) if return_type_instance is str: base_classes.append("Text") for base_class in base_classes: frontend_node.add_base_class(base_class) def extract_type_from_optional(field_type): """Extract the type from a string formatted as "Optional[]". Parameters: field_type (str): The string from which to extract the type. Returns: str: The extracted type, or an empty string if no type was found. """ if "optional" not in field_type.lower(): return field_type match = re.search(r"\[(.*?)\]$", field_type) return match[1] if match else field_type def get_field_properties(extra_field): """Get the properties of an extra field.""" field_name = extra_field["name"] field_type = extra_field.get("type", "str") field_value = extra_field.get("default", "") # a required field is a field that does not contain # optional in field_type # and a field that does not have a default value field_required = "optional" not in field_type.lower() and isinstance(field_value, MissingDefault) field_value = field_value if not isinstance(field_value, MissingDefault) else None if not field_required: field_type = extract_type_from_optional(field_type) if field_value is not None: with contextlib.suppress(Exception): field_value = ast.literal_eval(field_value) return field_name, field_type, field_value, field_required def process_type(field_type: str): if field_type.startswith(("list", "List")): return extract_inner_type(field_type) # field_type is a string can be Prompt or Code too # so we just need to lower if it is the case lowercase_type = field_type.lower() if lowercase_type in {"prompt", "code"}: return lowercase_type return field_type def add_new_custom_field( *, frontend_node: CustomComponentFrontendNode, field_name: str, field_type: str, field_value: Any, field_required: bool, field_config: dict, ): # Check field_config if any of the keys are in it # if it is, update the value display_name = field_config.pop("display_name", None) if not field_type: if "type" in field_config and field_config["type"] is not None: field_type = field_config.pop("type") elif "field_type" in field_config and field_config["field_type"] is not None: field_type = field_config.pop("field_type") field_contains_list = "list" in field_type.lower() field_type = process_type(field_type) field_value = field_config.pop("value", field_value) field_advanced = field_config.pop("advanced", False) if field_type == "Dict": field_type = "dict" if field_type == "bool" and field_value is None: field_value = False if field_type == "SecretStr": field_config["password"] = True field_config["load_from_db"] = True field_config["input_types"] = ["Text"] # If options is a list, then it's a dropdown or multiselect # If options is None, then it's a list of strings is_list = isinstance(field_config.get("options"), list) field_config["is_list"] = is_list or field_config.get("list", False) or field_contains_list if "name" in field_config: logger.warning("The 'name' key in field_config is used to build the object and can't be changed.") required = field_config.pop("required", field_required) placeholder = field_config.pop("placeholder", "") new_field = Input( name=field_name, field_type=field_type, value=field_value, show=True, required=required, advanced=field_advanced, placeholder=placeholder, display_name=display_name, **sanitize_field_config(field_config), ) frontend_node.template.upsert_field(field_name, new_field) if isinstance(frontend_node.custom_fields, dict): frontend_node.custom_fields[field_name] = None return frontend_node def add_extra_fields(frontend_node, field_config, function_args) -> None: """Add extra fields to the frontend node.""" if not function_args: return field_config_ = field_config.copy() function_args_names = [arg["name"] for arg in function_args] # If kwargs is in the function_args and not all field_config keys are in function_args # then we need to add the extra fields for extra_field in function_args: if "name" not in extra_field or extra_field["name"] in { "self", "kwargs", "args", }: continue field_name, field_type, field_value, field_required = get_field_properties(extra_field) config = field_config_.pop(field_name, {}) frontend_node = add_new_custom_field( frontend_node=frontend_node, field_name=field_name, field_type=field_type, field_value=field_value, field_required=field_required, field_config=config, ) if "kwargs" in function_args_names and not all(key in function_args_names for key in field_config): for field_name, config in field_config_.items(): if "name" not in config or field_name == "code": continue config_ = config.model_dump() if isinstance(config, BaseModel) else config field_name_, field_type, field_value, field_required = get_field_properties(extra_field=config_) frontend_node = add_new_custom_field( frontend_node=frontend_node, field_name=field_name_, field_type=field_type, field_value=field_value, field_required=field_required, field_config=config_, ) def get_field_dict(field: Input | dict): """Get the field dictionary from a Input or a dict.""" if isinstance(field, Input): return dotdict(field.model_dump(by_alias=True, exclude_none=True)) return field def run_build_inputs( custom_component: Component, ): """Run the build inputs of a custom component.""" try: return custom_component.build_inputs() # add_extra_fields(frontend_node, field_config, field_config.values()) except Exception as exc: logger.exception("Error running build inputs") raise HTTPException(status_code=500, detail=str(exc)) from exc def get_component_instance(custom_component: CustomComponent, user_id: str | UUID | None = None): if custom_component._code is None: error = "Code is None" elif not isinstance(custom_component._code, str): error = "Invalid code type" else: try: custom_class = eval_custom_component_code(custom_component._code) except Exception as exc: logger.exception("Error while evaluating custom component code") raise HTTPException( status_code=400, detail={ "error": ("Invalid type conversion. Please check your code and try again."), "traceback": traceback.format_exc(), }, ) from exc try: return custom_class(_user_id=user_id, _code=custom_component._code) except Exception as exc: logger.exception("Error while instantiating custom component") if hasattr(exc, "detail") and "traceback" in exc.detail: logger.error(exc.detail["traceback"]) raise msg = f"Invalid type conversion: {error}. Please check your code and try again." logger.error(msg) raise HTTPException( status_code=400, detail={"error": msg}, ) def run_build_config( custom_component: CustomComponent, user_id: str | UUID | None = None, ) -> tuple[dict, CustomComponent]: """Build the field configuration for a custom component.""" if custom_component._code is None: error = "Code is None" elif not isinstance(custom_component._code, str): error = "Invalid code type" else: try: custom_class = eval_custom_component_code(custom_component._code) except Exception as exc: logger.exception("Error while evaluating custom component code") raise HTTPException( status_code=400, detail={ "error": ("Invalid type conversion. Please check your code and try again."), "traceback": traceback.format_exc(), }, ) from exc try: custom_instance = custom_class(_user_id=user_id) build_config: dict = custom_instance.build_config() for field_name, field in build_config.copy().items(): # Allow user to build Input as well # as a dict with the same keys as Input field_dict = get_field_dict(field) # Let's check if "rangeSpec" is a RangeSpec object if "rangeSpec" in field_dict and isinstance(field_dict["rangeSpec"], RangeSpec): field_dict["rangeSpec"] = field_dict["rangeSpec"].model_dump() build_config[field_name] = field_dict except Exception as exc: logger.exception("Error while building field config") if hasattr(exc, "detail") and "traceback" in exc.detail: logger.error(exc.detail["traceback"]) raise return build_config, custom_instance msg = f"Invalid type conversion: {error}. Please check your code and try again." logger.error(msg) raise HTTPException( status_code=400, detail={"error": msg}, ) def add_code_field(frontend_node: CustomComponentFrontendNode, raw_code): code_field = Input( dynamic=True, required=True, placeholder="", multiline=True, value=raw_code, password=False, name="code", advanced=True, field_type="code", is_list=False, ) frontend_node.template.add_field(code_field) return frontend_node def build_custom_component_template_from_inputs( custom_component: Component | CustomComponent, user_id: str | UUID | None = None ): # The List of Inputs fills the role of the build_config and the entrypoint_args cc_instance = get_component_instance(custom_component, user_id=user_id) field_config = cc_instance.get_template_config(cc_instance) frontend_node = ComponentFrontendNode.from_inputs(**field_config) frontend_node = add_code_field(frontend_node, custom_component._code) # But we now need to calculate the return_type of the methods in the outputs for output in frontend_node.outputs: if output.types: continue return_types = cc_instance.get_method_return_type(output.method) return_types = [format_type(return_type) for return_type in return_types] output.add_types(return_types) output.set_selected() # Validate that there is not name overlap between inputs and outputs frontend_node.validate_component() # ! This should be removed when we have a better way to handle this frontend_node.set_base_classes_from_outputs() reorder_fields(frontend_node, cc_instance._get_field_order()) return frontend_node.to_dict(keep_name=False), cc_instance def build_custom_component_template( custom_component: CustomComponent, user_id: str | UUID | None = None, ) -> tuple[dict[str, Any], CustomComponent | Component]: """Build a custom component template.""" try: has_template_config = hasattr(custom_component, "template_config") except Exception as exc: raise HTTPException( status_code=400, detail={ "error": (f"Error building Component: {exc}"), "traceback": traceback.format_exc(), }, ) from exc if not has_template_config: raise HTTPException( status_code=400, detail={ "error": ("Error building Component. Please check if you are importing Component correctly."), }, ) try: if "inputs" in custom_component.template_config: return build_custom_component_template_from_inputs(custom_component, user_id=user_id) frontend_node = CustomComponentFrontendNode(**custom_component.template_config) field_config, custom_instance = run_build_config( custom_component, user_id=user_id, ) entrypoint_args = custom_component.get_function_entrypoint_args add_extra_fields(frontend_node, field_config, entrypoint_args) frontend_node = add_code_field(frontend_node, custom_component._code) add_base_classes(frontend_node, custom_component._get_function_entrypoint_return_type) add_output_types(frontend_node, custom_component._get_function_entrypoint_return_type) reorder_fields(frontend_node, custom_instance._get_field_order()) return frontend_node.to_dict(keep_name=False), custom_instance except Exception as exc: if isinstance(exc, HTTPException): raise raise HTTPException( status_code=400, detail={ "error": (f"Error building Component: {exc}"), "traceback": traceback.format_exc(), }, ) from exc def create_component_template(component): """Create a template for a component.""" component_code = component["code"] component_output_types = component["output_types"] component_extractor = Component(_code=component_code) component_template, component_instance = build_custom_component_template(component_extractor) if not component_template["output_types"] and component_output_types: component_template["output_types"] = component_output_types return component_template, component_instance def build_custom_components(components_paths: list[str]): """Build custom components from the specified paths.""" if not components_paths: return {} logger.info(f"Building custom components from {components_paths}") custom_components_from_file: dict = {} processed_paths = set() for path in components_paths: path_str = str(path) if path_str in processed_paths: continue custom_component_dict = build_custom_component_list_from_path(path_str) if custom_component_dict: category = next(iter(custom_component_dict)) logger.info(f"Loading {len(custom_component_dict[category])} component(s) from category {category}") custom_components_from_file = merge_nested_dicts_with_renaming( custom_components_from_file, custom_component_dict ) processed_paths.add(path_str) return custom_components_from_file async def abuild_custom_components(components_paths: list[str]): """Build custom components from the specified paths.""" if not components_paths: return {} logger.info(f"Building custom components from {components_paths}") custom_components_from_file: dict = {} processed_paths = set() for path in components_paths: path_str = str(path) if path_str in processed_paths: continue custom_component_dict = await abuild_custom_component_list_from_path(path_str) if custom_component_dict: category = next(iter(custom_component_dict)) logger.info(f"Loading {len(custom_component_dict[category])} component(s) from category {category}") custom_components_from_file = merge_nested_dicts_with_renaming( custom_components_from_file, custom_component_dict ) processed_paths.add(path_str) return custom_components_from_file def update_field_dict( custom_component_instance: "CustomComponent", field_dict: dict, build_config: dict, *, update_field: str | None = None, update_field_value: Any | None = None, call: bool = False, ): """Update the field dictionary by calling options() or value() if they are callable.""" if ( ("real_time_refresh" in field_dict or "refresh_button" in field_dict) and any( ( field_dict.get("real_time_refresh", False), field_dict.get("refresh_button", False), ) ) and call ): try: dd_build_config = dotdict(build_config) custom_component_instance.update_build_config( build_config=dd_build_config, field_value=update_field, field_name=update_field_value, ) build_config = dd_build_config except Exception as exc: msg = f"Error while running update_build_config: {exc}" logger.exception(msg) raise UpdateBuildConfigError(msg) from exc return build_config def sanitize_field_config(field_config: dict | Input): # If any of the already existing keys are in field_config, remove them field_dict = field_config.to_dict() if isinstance(field_config, Input) else field_config for key in [ "name", "field_type", "value", "required", "placeholder", "display_name", "advanced", "show", ]: field_dict.pop(key, None) # Remove field_type and type because they were extracted already field_dict.pop("field_type", None) field_dict.pop("type", None) return field_dict def build_component(component): """Build a single component.""" component_template, component_instance = create_component_template(component) component_name = get_instance_name(component_instance) return component_name, component_template def get_function(code): """Get the function.""" function_name = validate.extract_function_name(code) return validate.create_function(code, function_name) def get_instance_name(instance): name = instance.__class__.__name__ if hasattr(instance, "name") and instance.name: name = instance.name return name