Tai Truong
fix readme
d202ada
import ast
import contextlib
import re
import traceback
from typing import Any
from uuid import UUID
from fastapi import HTTPException
from loguru import logger
from pydantic import BaseModel
from langflow.custom import CustomComponent
from langflow.custom.custom_component.component import Component
from langflow.custom.directory_reader.utils import (
abuild_custom_component_list_from_path,
build_custom_component_list_from_path,
merge_nested_dicts_with_renaming,
)
from langflow.custom.eval import eval_custom_component_code
from langflow.custom.schema import MissingDefault
from langflow.field_typing.range_spec import RangeSpec
from langflow.helpers.custom import format_type
from langflow.schema import dotdict
from langflow.template.field.base import Input
from langflow.template.frontend_node.custom_components import ComponentFrontendNode, CustomComponentFrontendNode
from langflow.type_extraction.type_extraction import extract_inner_type
from langflow.utils import validate
from langflow.utils.util import get_base_classes
class UpdateBuildConfigError(Exception):
pass
def add_output_types(frontend_node: CustomComponentFrontendNode, return_types: list[str]) -> None:
"""Add output types to the frontend node."""
for return_type in return_types:
if return_type is None:
raise HTTPException(
status_code=400,
detail={
"error": ("Invalid return type. Please check your code and try again."),
"traceback": traceback.format_exc(),
},
)
if return_type is str:
return_type_ = "Text"
elif hasattr(return_type, "__name__"):
return_type_ = return_type.__name__
elif hasattr(return_type, "__class__"):
return_type_ = return_type.__class__.__name__
else:
return_type_ = str(return_type)
frontend_node.add_output_type(return_type_)
def reorder_fields(frontend_node: CustomComponentFrontendNode, field_order: list[str]) -> None:
"""Reorder fields in the frontend node based on the specified field_order."""
if not field_order:
return
# Create a dictionary for O(1) lookup time.
field_dict = {field.name: field for field in frontend_node.template.fields}
reordered_fields = [field_dict[name] for name in field_order if name in field_dict]
# Add any fields that are not in the field_order list
reordered_fields.extend(field for field in frontend_node.template.fields if field.name not in field_order)
frontend_node.template.fields = reordered_fields
frontend_node.field_order = field_order
def add_base_classes(frontend_node: CustomComponentFrontendNode, return_types: list[str]) -> None:
"""Add base classes to the frontend node."""
for return_type_instance in return_types:
if return_type_instance is None:
raise HTTPException(
status_code=400,
detail={
"error": ("Invalid return type. Please check your code and try again."),
"traceback": traceback.format_exc(),
},
)
base_classes = get_base_classes(return_type_instance)
if return_type_instance is str:
base_classes.append("Text")
for base_class in base_classes:
frontend_node.add_base_class(base_class)
def extract_type_from_optional(field_type):
"""Extract the type from a string formatted as "Optional[<type>]".
Parameters:
field_type (str): The string from which to extract the type.
Returns:
str: The extracted type, or an empty string if no type was found.
"""
if "optional" not in field_type.lower():
return field_type
match = re.search(r"\[(.*?)\]$", field_type)
return match[1] if match else field_type
def get_field_properties(extra_field):
"""Get the properties of an extra field."""
field_name = extra_field["name"]
field_type = extra_field.get("type", "str")
field_value = extra_field.get("default", "")
# a required field is a field that does not contain
# optional in field_type
# and a field that does not have a default value
field_required = "optional" not in field_type.lower() and isinstance(field_value, MissingDefault)
field_value = field_value if not isinstance(field_value, MissingDefault) else None
if not field_required:
field_type = extract_type_from_optional(field_type)
if field_value is not None:
with contextlib.suppress(Exception):
field_value = ast.literal_eval(field_value)
return field_name, field_type, field_value, field_required
def process_type(field_type: str):
if field_type.startswith(("list", "List")):
return extract_inner_type(field_type)
# field_type is a string can be Prompt or Code too
# so we just need to lower if it is the case
lowercase_type = field_type.lower()
if lowercase_type in {"prompt", "code"}:
return lowercase_type
return field_type
def add_new_custom_field(
*,
frontend_node: CustomComponentFrontendNode,
field_name: str,
field_type: str,
field_value: Any,
field_required: bool,
field_config: dict,
):
# Check field_config if any of the keys are in it
# if it is, update the value
display_name = field_config.pop("display_name", None)
if not field_type:
if "type" in field_config and field_config["type"] is not None:
field_type = field_config.pop("type")
elif "field_type" in field_config and field_config["field_type"] is not None:
field_type = field_config.pop("field_type")
field_contains_list = "list" in field_type.lower()
field_type = process_type(field_type)
field_value = field_config.pop("value", field_value)
field_advanced = field_config.pop("advanced", False)
if field_type == "Dict":
field_type = "dict"
if field_type == "bool" and field_value is None:
field_value = False
if field_type == "SecretStr":
field_config["password"] = True
field_config["load_from_db"] = True
field_config["input_types"] = ["Text"]
# If options is a list, then it's a dropdown or multiselect
# If options is None, then it's a list of strings
is_list = isinstance(field_config.get("options"), list)
field_config["is_list"] = is_list or field_config.get("list", False) or field_contains_list
if "name" in field_config:
logger.warning("The 'name' key in field_config is used to build the object and can't be changed.")
required = field_config.pop("required", field_required)
placeholder = field_config.pop("placeholder", "")
new_field = Input(
name=field_name,
field_type=field_type,
value=field_value,
show=True,
required=required,
advanced=field_advanced,
placeholder=placeholder,
display_name=display_name,
**sanitize_field_config(field_config),
)
frontend_node.template.upsert_field(field_name, new_field)
if isinstance(frontend_node.custom_fields, dict):
frontend_node.custom_fields[field_name] = None
return frontend_node
def add_extra_fields(frontend_node, field_config, function_args) -> None:
"""Add extra fields to the frontend node."""
if not function_args:
return
field_config_ = field_config.copy()
function_args_names = [arg["name"] for arg in function_args]
# If kwargs is in the function_args and not all field_config keys are in function_args
# then we need to add the extra fields
for extra_field in function_args:
if "name" not in extra_field or extra_field["name"] in {
"self",
"kwargs",
"args",
}:
continue
field_name, field_type, field_value, field_required = get_field_properties(extra_field)
config = field_config_.pop(field_name, {})
frontend_node = add_new_custom_field(
frontend_node=frontend_node,
field_name=field_name,
field_type=field_type,
field_value=field_value,
field_required=field_required,
field_config=config,
)
if "kwargs" in function_args_names and not all(key in function_args_names for key in field_config):
for field_name, config in field_config_.items():
if "name" not in config or field_name == "code":
continue
config_ = config.model_dump() if isinstance(config, BaseModel) else config
field_name_, field_type, field_value, field_required = get_field_properties(extra_field=config_)
frontend_node = add_new_custom_field(
frontend_node=frontend_node,
field_name=field_name_,
field_type=field_type,
field_value=field_value,
field_required=field_required,
field_config=config_,
)
def get_field_dict(field: Input | dict):
"""Get the field dictionary from a Input or a dict."""
if isinstance(field, Input):
return dotdict(field.model_dump(by_alias=True, exclude_none=True))
return field
def run_build_inputs(
custom_component: Component,
):
"""Run the build inputs of a custom component."""
try:
return custom_component.build_inputs()
# add_extra_fields(frontend_node, field_config, field_config.values())
except Exception as exc:
logger.exception("Error running build inputs")
raise HTTPException(status_code=500, detail=str(exc)) from exc
def get_component_instance(custom_component: CustomComponent, user_id: str | UUID | None = None):
if custom_component._code is None:
error = "Code is None"
elif not isinstance(custom_component._code, str):
error = "Invalid code type"
else:
try:
custom_class = eval_custom_component_code(custom_component._code)
except Exception as exc:
logger.exception("Error while evaluating custom component code")
raise HTTPException(
status_code=400,
detail={
"error": ("Invalid type conversion. Please check your code and try again."),
"traceback": traceback.format_exc(),
},
) from exc
try:
return custom_class(_user_id=user_id, _code=custom_component._code)
except Exception as exc:
logger.exception("Error while instantiating custom component")
if hasattr(exc, "detail") and "traceback" in exc.detail:
logger.error(exc.detail["traceback"])
raise
msg = f"Invalid type conversion: {error}. Please check your code and try again."
logger.error(msg)
raise HTTPException(
status_code=400,
detail={"error": msg},
)
def run_build_config(
custom_component: CustomComponent,
user_id: str | UUID | None = None,
) -> tuple[dict, CustomComponent]:
"""Build the field configuration for a custom component."""
if custom_component._code is None:
error = "Code is None"
elif not isinstance(custom_component._code, str):
error = "Invalid code type"
else:
try:
custom_class = eval_custom_component_code(custom_component._code)
except Exception as exc:
logger.exception("Error while evaluating custom component code")
raise HTTPException(
status_code=400,
detail={
"error": ("Invalid type conversion. Please check your code and try again."),
"traceback": traceback.format_exc(),
},
) from exc
try:
custom_instance = custom_class(_user_id=user_id)
build_config: dict = custom_instance.build_config()
for field_name, field in build_config.copy().items():
# Allow user to build Input as well
# as a dict with the same keys as Input
field_dict = get_field_dict(field)
# Let's check if "rangeSpec" is a RangeSpec object
if "rangeSpec" in field_dict and isinstance(field_dict["rangeSpec"], RangeSpec):
field_dict["rangeSpec"] = field_dict["rangeSpec"].model_dump()
build_config[field_name] = field_dict
except Exception as exc:
logger.exception("Error while building field config")
if hasattr(exc, "detail") and "traceback" in exc.detail:
logger.error(exc.detail["traceback"])
raise
return build_config, custom_instance
msg = f"Invalid type conversion: {error}. Please check your code and try again."
logger.error(msg)
raise HTTPException(
status_code=400,
detail={"error": msg},
)
def add_code_field(frontend_node: CustomComponentFrontendNode, raw_code):
code_field = Input(
dynamic=True,
required=True,
placeholder="",
multiline=True,
value=raw_code,
password=False,
name="code",
advanced=True,
field_type="code",
is_list=False,
)
frontend_node.template.add_field(code_field)
return frontend_node
def build_custom_component_template_from_inputs(
custom_component: Component | CustomComponent, user_id: str | UUID | None = None
):
# The List of Inputs fills the role of the build_config and the entrypoint_args
cc_instance = get_component_instance(custom_component, user_id=user_id)
field_config = cc_instance.get_template_config(cc_instance)
frontend_node = ComponentFrontendNode.from_inputs(**field_config)
frontend_node = add_code_field(frontend_node, custom_component._code)
# But we now need to calculate the return_type of the methods in the outputs
for output in frontend_node.outputs:
if output.types:
continue
return_types = cc_instance.get_method_return_type(output.method)
return_types = [format_type(return_type) for return_type in return_types]
output.add_types(return_types)
output.set_selected()
# Validate that there is not name overlap between inputs and outputs
frontend_node.validate_component()
# ! This should be removed when we have a better way to handle this
frontend_node.set_base_classes_from_outputs()
reorder_fields(frontend_node, cc_instance._get_field_order())
return frontend_node.to_dict(keep_name=False), cc_instance
def build_custom_component_template(
custom_component: CustomComponent,
user_id: str | UUID | None = None,
) -> tuple[dict[str, Any], CustomComponent | Component]:
"""Build a custom component template."""
try:
has_template_config = hasattr(custom_component, "template_config")
except Exception as exc:
raise HTTPException(
status_code=400,
detail={
"error": (f"Error building Component: {exc}"),
"traceback": traceback.format_exc(),
},
) from exc
if not has_template_config:
raise HTTPException(
status_code=400,
detail={
"error": ("Error building Component. Please check if you are importing Component correctly."),
},
)
try:
if "inputs" in custom_component.template_config:
return build_custom_component_template_from_inputs(custom_component, user_id=user_id)
frontend_node = CustomComponentFrontendNode(**custom_component.template_config)
field_config, custom_instance = run_build_config(
custom_component,
user_id=user_id,
)
entrypoint_args = custom_component.get_function_entrypoint_args
add_extra_fields(frontend_node, field_config, entrypoint_args)
frontend_node = add_code_field(frontend_node, custom_component._code)
add_base_classes(frontend_node, custom_component._get_function_entrypoint_return_type)
add_output_types(frontend_node, custom_component._get_function_entrypoint_return_type)
reorder_fields(frontend_node, custom_instance._get_field_order())
return frontend_node.to_dict(keep_name=False), custom_instance
except Exception as exc:
if isinstance(exc, HTTPException):
raise
raise HTTPException(
status_code=400,
detail={
"error": (f"Error building Component: {exc}"),
"traceback": traceback.format_exc(),
},
) from exc
def create_component_template(component):
"""Create a template for a component."""
component_code = component["code"]
component_output_types = component["output_types"]
component_extractor = Component(_code=component_code)
component_template, component_instance = build_custom_component_template(component_extractor)
if not component_template["output_types"] and component_output_types:
component_template["output_types"] = component_output_types
return component_template, component_instance
def build_custom_components(components_paths: list[str]):
"""Build custom components from the specified paths."""
if not components_paths:
return {}
logger.info(f"Building custom components from {components_paths}")
custom_components_from_file: dict = {}
processed_paths = set()
for path in components_paths:
path_str = str(path)
if path_str in processed_paths:
continue
custom_component_dict = build_custom_component_list_from_path(path_str)
if custom_component_dict:
category = next(iter(custom_component_dict))
logger.info(f"Loading {len(custom_component_dict[category])} component(s) from category {category}")
custom_components_from_file = merge_nested_dicts_with_renaming(
custom_components_from_file, custom_component_dict
)
processed_paths.add(path_str)
return custom_components_from_file
async def abuild_custom_components(components_paths: list[str]):
"""Build custom components from the specified paths."""
if not components_paths:
return {}
logger.info(f"Building custom components from {components_paths}")
custom_components_from_file: dict = {}
processed_paths = set()
for path in components_paths:
path_str = str(path)
if path_str in processed_paths:
continue
custom_component_dict = await abuild_custom_component_list_from_path(path_str)
if custom_component_dict:
category = next(iter(custom_component_dict))
logger.info(f"Loading {len(custom_component_dict[category])} component(s) from category {category}")
custom_components_from_file = merge_nested_dicts_with_renaming(
custom_components_from_file, custom_component_dict
)
processed_paths.add(path_str)
return custom_components_from_file
def update_field_dict(
custom_component_instance: "CustomComponent",
field_dict: dict,
build_config: dict,
*,
update_field: str | None = None,
update_field_value: Any | None = None,
call: bool = False,
):
"""Update the field dictionary by calling options() or value() if they are callable."""
if (
("real_time_refresh" in field_dict or "refresh_button" in field_dict)
and any(
(
field_dict.get("real_time_refresh", False),
field_dict.get("refresh_button", False),
)
)
and call
):
try:
dd_build_config = dotdict(build_config)
custom_component_instance.update_build_config(
build_config=dd_build_config,
field_value=update_field,
field_name=update_field_value,
)
build_config = dd_build_config
except Exception as exc:
msg = f"Error while running update_build_config: {exc}"
logger.exception(msg)
raise UpdateBuildConfigError(msg) from exc
return build_config
def sanitize_field_config(field_config: dict | Input):
# If any of the already existing keys are in field_config, remove them
field_dict = field_config.to_dict() if isinstance(field_config, Input) else field_config
for key in [
"name",
"field_type",
"value",
"required",
"placeholder",
"display_name",
"advanced",
"show",
]:
field_dict.pop(key, None)
# Remove field_type and type because they were extracted already
field_dict.pop("field_type", None)
field_dict.pop("type", None)
return field_dict
def build_component(component):
"""Build a single component."""
component_template, component_instance = create_component_template(component)
component_name = get_instance_name(component_instance)
return component_name, component_template
def get_function(code):
"""Get the function."""
function_name = validate.extract_function_name(code)
return validate.create_function(code, function_name)
def get_instance_name(instance):
name = instance.__class__.__name__
if hasattr(instance, "name") and instance.name:
name = instance.name
return name