Tai Truong
fix readme
d202ada
raw
history blame
15 kB
import difflib
import importlib
import inspect
import json
import re
from functools import wraps
from pathlib import Path
from typing import Any
from docstring_parser import parse
from langflow.logging.logger import logger
from langflow.schema import Data
from langflow.services.deps import get_settings_service
from langflow.template.frontend_node.constants import FORCE_SHOW_FIELDS
from langflow.utils import constants
def unescape_string(s: str):
# Replace escaped new line characters with actual new line characters
return s.replace("\\n", "\n")
def remove_ansi_escape_codes(text):
return re.sub(r"\x1b\[[0-9;]*[a-zA-Z]", "", text)
def build_template_from_function(name: str, type_to_loader_dict: dict, *, add_function: bool = False):
classes = [item.__annotations__["return"].__name__ for item in type_to_loader_dict.values()]
# Raise error if name is not in chains
if name not in classes:
msg = f"{name} not found"
raise ValueError(msg)
for _type, v in type_to_loader_dict.items():
if v.__annotations__["return"].__name__ == name:
class_ = v.__annotations__["return"]
# Get the docstring
docs = parse(class_.__doc__)
variables = {"_type": _type}
for class_field_items, value in class_.model_fields.items():
if class_field_items == "callback_manager":
continue
variables[class_field_items] = {}
for name_, value_ in value.__repr_args__():
if name_ == "default_factory":
try:
variables[class_field_items]["default"] = get_default_factory(
module=class_.__base__.__module__, function=value_
)
except Exception: # noqa: BLE001
logger.opt(exception=True).debug(f"Error getting default factory for {value_}")
variables[class_field_items]["default"] = None
elif name_ != "name":
variables[class_field_items][name_] = value_
variables[class_field_items]["placeholder"] = docs.params.get(class_field_items, "")
# Adding function to base classes to allow
# the output to be a function
base_classes = get_base_classes(class_)
if add_function:
base_classes.append("Callable")
return {
"template": format_dict(variables, name),
"description": docs.short_description or "",
"base_classes": base_classes,
}
return None
def build_template_from_method(
class_name: str,
method_name: str,
type_to_cls_dict: dict,
*,
add_function: bool = False,
):
classes = [item.__name__ for item in type_to_cls_dict.values()]
# Raise error if class_name is not in classes
if class_name not in classes:
msg = f"{class_name} not found."
raise ValueError(msg)
for _type, v in type_to_cls_dict.items():
if v.__name__ == class_name:
class_ = v
# Check if the method exists in this class
if not hasattr(class_, method_name):
msg = f"Method {method_name} not found in class {class_name}"
raise ValueError(msg)
# Get the method
method = getattr(class_, method_name)
# Get the docstring
docs = parse(method.__doc__)
# Get the signature of the method
sig = inspect.signature(method)
# Get the parameters of the method
params = sig.parameters
# Initialize the variables dictionary with method parameters
variables = {
"_type": _type,
**{
name: {
"default": (param.default if param.default != param.empty else None),
"type": (param.annotation if param.annotation != param.empty else None),
"required": param.default == param.empty,
}
for name, param in params.items()
if name not in {"self", "kwargs", "args"}
},
}
base_classes = get_base_classes(class_)
# Adding function to base classes to allow the output to be a function
if add_function:
base_classes.append("Callable")
return {
"template": format_dict(variables, class_name),
"description": docs.short_description or "",
"base_classes": base_classes,
}
return None
def get_base_classes(cls):
"""Get the base classes of a class.
These are used to determine the output of the nodes.
"""
if hasattr(cls, "__bases__") and cls.__bases__:
bases = cls.__bases__
result = []
for base in bases:
if any(_type in base.__module__ for _type in ["pydantic", "abc"]):
continue
result.append(base.__name__)
base_classes = get_base_classes(base)
# check if the base_classes are in the result
# if not, add them
for base_class in base_classes:
if base_class not in result:
result.append(base_class)
else:
result = [cls.__name__]
if not result:
result = [cls.__name__]
return list({*result, cls.__name__})
def get_default_factory(module: str, function: str):
pattern = r"<function (\w+)>"
if match := re.search(pattern, function):
imported_module = importlib.import_module(module)
return getattr(imported_module, match[1])()
return None
def update_verbose(d: dict, *, new_value: bool) -> dict:
"""Recursively updates the value of the 'verbose' key in a dictionary.
Args:
d: the dictionary to update
new_value: the new value to set
Returns:
The updated dictionary.
"""
for k, v in d.items():
if isinstance(v, dict):
update_verbose(v, new_value=new_value)
elif k == "verbose":
d[k] = new_value
return d
def sync_to_async(func):
"""Decorator to convert a sync function to an async function."""
@wraps(func)
async def async_wrapper(*args, **kwargs):
return func(*args, **kwargs)
return async_wrapper
def format_dict(dictionary: dict[str, Any], class_name: str | None = None) -> dict[str, Any]:
"""Formats a dictionary by removing certain keys and modifying the values of other keys.
Returns:
A new dictionary with the desired modifications applied.
"""
for key, value in dictionary.items():
if key == "_type":
continue
type_: str | type = get_type(value)
if "BaseModel" in str(type_):
continue
type_ = remove_optional_wrapper(type_)
type_ = check_list_type(type_, value)
type_ = replace_mapping_with_dict(type_)
type_ = get_type_from_union_literal(type_)
value["type"] = get_formatted_type(key, type_)
value["show"] = should_show_field(value, key)
value["password"] = is_password_field(key)
value["multiline"] = is_multiline_field(key)
if key == "dict_":
set_dict_file_attributes(value)
replace_default_value_with_actual(value)
if key == "headers":
set_headers_value(value)
add_options_to_field(value, class_name, key)
return dictionary
# "Union[Literal['f-string'], Literal['jinja2']]" -> "str"
def get_type_from_union_literal(union_literal: str) -> str:
# if types are literal strings
# the type is a string
if "Literal" in union_literal:
return "str"
return union_literal
def get_type(value: Any) -> str | type:
"""Retrieves the type value from the dictionary.
Returns:
The type value.
"""
# get "type" or "annotation" from the value
type_ = value.get("type") or value.get("annotation")
return type_ if isinstance(type_, str) else type_.__name__
def remove_optional_wrapper(type_: str | type) -> str:
"""Removes the 'Optional' wrapper from the type string.
Returns:
The type string with the 'Optional' wrapper removed.
"""
if isinstance(type_, type):
type_ = str(type_)
if "Optional" in type_:
type_ = type_.replace("Optional[", "")[:-1]
return type_
def check_list_type(type_: str, value: dict[str, Any]) -> str:
"""Checks if the type is a list type and modifies the value accordingly.
Returns:
The modified type string.
"""
if any(list_type in type_ for list_type in ["List", "Sequence", "Set"]):
type_ = type_.replace("List[", "").replace("Sequence[", "").replace("Set[", "")[:-1]
value["list"] = True
else:
value["list"] = False
return type_
def replace_mapping_with_dict(type_: str) -> str:
"""Replaces 'Mapping' with 'dict' in the type string.
Returns:
The modified type string.
"""
if "Mapping" in type_:
type_ = type_.replace("Mapping", "dict")
return type_
def get_formatted_type(key: str, type_: str) -> str:
"""Formats the type value based on the given key.
Returns:
The formatted type value.
"""
if key == "allowed_tools":
return "Tool"
if key == "max_value_length":
return "int"
return type_
def should_show_field(value: dict[str, Any], key: str) -> bool:
"""Determines if the field should be shown or not.
Returns:
True if the field should be shown, False otherwise.
"""
return (
(value["required"] and key != "input_variables")
or key in FORCE_SHOW_FIELDS
or any(text in key.lower() for text in ["password", "token", "api", "key"])
)
def is_password_field(key: str) -> bool:
"""Determines if the field is a password field.
Returns:
True if the field is a password field, False otherwise.
"""
return any(text in key.lower() for text in ["password", "token", "api", "key"])
def is_multiline_field(key: str) -> bool:
"""Determines if the field is a multiline field.
Returns:
True if the field is a multiline field, False otherwise.
"""
return key in {
"suffix",
"prefix",
"template",
"examples",
"code",
"headers",
"format_instructions",
}
def set_dict_file_attributes(value: dict[str, Any]) -> None:
"""Sets the file attributes for the 'dict_' key."""
value["type"] = "file"
value["fileTypes"] = [".json", ".yaml", ".yml"]
def replace_default_value_with_actual(value: dict[str, Any]) -> None:
"""Replaces the default value with the actual value."""
if "default" in value:
value["value"] = value["default"]
value.pop("default")
def set_headers_value(value: dict[str, Any]) -> None:
"""Sets the value for the 'headers' key."""
value["value"] = """{"Authorization": "Bearer <token>"}"""
def add_options_to_field(value: dict[str, Any], class_name: str | None, key: str) -> None:
"""Adds options to the field based on the class name and key."""
options_map = {
"OpenAI": constants.OPENAI_MODELS,
"ChatOpenAI": constants.CHAT_OPENAI_MODELS,
"Anthropic": constants.ANTHROPIC_MODELS,
"ChatAnthropic": constants.ANTHROPIC_MODELS,
}
if class_name in options_map and key == "model_name":
value["options"] = options_map[class_name]
value["list"] = True
value["value"] = options_map[class_name][0]
def build_loader_repr_from_data(data: list[Data]) -> str:
"""Builds a string representation of the loader based on the given data.
Args:
data (List[Data]): A list of data.
Returns:
str: A string representation of the loader.
"""
if data:
avg_length = sum(len(doc.text) for doc in data) / len(data)
return f"""{len(data)} data
\nAvg. Data Length (characters): {int(avg_length)}
Data: {data[:3]}..."""
return "0 data"
def update_settings(
*,
config: str | None = None,
cache: str | None = None,
dev: bool = False,
remove_api_keys: bool = False,
components_path: Path | None = None,
store: bool = True,
auto_saving: bool = True,
auto_saving_interval: int = 1000,
health_check_max_retries: int = 5,
max_file_size_upload: int = 100,
) -> None:
"""Update the settings from a config file."""
from langflow.services.utils import initialize_settings_service
# Check for database_url in the environment variables
initialize_settings_service()
settings_service = get_settings_service()
if config:
logger.debug(f"Loading settings from {config}")
settings_service.settings.update_from_yaml(config, dev=dev)
if remove_api_keys:
logger.debug(f"Setting remove_api_keys to {remove_api_keys}")
settings_service.settings.update_settings(remove_api_keys=remove_api_keys)
if cache:
logger.debug(f"Setting cache to {cache}")
settings_service.settings.update_settings(cache=cache)
if components_path:
logger.debug(f"Adding component path {components_path}")
settings_service.settings.update_settings(components_path=components_path)
if not store:
logger.debug("Setting store to False")
settings_service.settings.update_settings(store=False)
if not auto_saving:
logger.debug("Setting auto_saving to False")
settings_service.settings.update_settings(auto_saving=False)
if auto_saving_interval is not None:
logger.debug(f"Setting auto_saving_interval to {auto_saving_interval}")
settings_service.settings.update_settings(auto_saving_interval=auto_saving_interval)
if health_check_max_retries is not None:
logger.debug(f"Setting health_check_max_retries to {health_check_max_retries}")
settings_service.settings.update_settings(health_check_max_retries=health_check_max_retries)
if max_file_size_upload is not None:
logger.debug(f"Setting max_file_size_upload to {max_file_size_upload}")
settings_service.settings.update_settings(max_file_size_upload=max_file_size_upload)
def is_class_method(func, cls):
"""Check if a function is a class method."""
return inspect.ismethod(func) and func.__self__ is cls.__class__
def escape_json_dump(edge_dict):
return json.dumps(edge_dict).replace('"', "œ")
def find_closest_match(string: str, list_of_strings: list[str]) -> str | None:
"""Find the closest match in a list of strings."""
closest_match = difflib.get_close_matches(string, list_of_strings, n=1, cutoff=0.2)
if closest_match:
return closest_match[0]
return None