Tai Truong
fix readme
d202ada
import ast
import contextlib
import importlib
import warnings
from types import FunctionType
from typing import Optional, Union
from langchain_core._api.deprecation import LangChainDeprecationWarning
from loguru import logger
from pydantic import ValidationError
from langflow.field_typing.constants import CUSTOM_COMPONENT_SUPPORTED_TYPES
def add_type_ignores() -> None:
if not hasattr(ast, "TypeIgnore"):
class TypeIgnore(ast.AST):
_fields = ()
ast.TypeIgnore = TypeIgnore # type: ignore[assignment, misc]
def validate_code(code):
# Initialize the errors dictionary
errors = {"imports": {"errors": []}, "function": {"errors": []}}
# Parse the code string into an abstract syntax tree (AST)
try:
tree = ast.parse(code)
except Exception as e: # noqa: BLE001
if hasattr(logger, "opt"):
logger.opt(exception=True).debug("Error parsing code")
else:
logger.debug("Error parsing code")
errors["function"]["errors"].append(str(e))
return errors
# Add a dummy type_ignores field to the AST
add_type_ignores()
tree.type_ignores = []
# Evaluate the import statements
for node in tree.body:
if isinstance(node, ast.Import):
for alias in node.names:
try:
importlib.import_module(alias.name)
except ModuleNotFoundError as e:
errors["imports"]["errors"].append(str(e))
# Evaluate the function definition
for node in tree.body:
if isinstance(node, ast.FunctionDef):
code_obj = compile(ast.Module(body=[node], type_ignores=[]), "<string>", "exec")
try:
exec(code_obj)
except Exception as e: # noqa: BLE001
logger.opt(exception=True).debug("Error executing function code")
errors["function"]["errors"].append(str(e))
# Return the errors dictionary
return errors
def eval_function(function_string: str):
# Create an empty dictionary to serve as a separate namespace
namespace: dict = {}
# Execute the code string in the new namespace
exec(function_string, namespace)
function_object = next(
(
obj
for name, obj in namespace.items()
if isinstance(obj, FunctionType) and obj.__code__.co_filename == "<string>"
),
None,
)
if function_object is None:
msg = "Function string does not contain a function"
raise ValueError(msg)
return function_object
def execute_function(code, function_name, *args, **kwargs):
add_type_ignores()
module = ast.parse(code)
exec_globals = globals().copy()
for node in module.body:
if isinstance(node, ast.Import):
for alias in node.names:
try:
exec(
f"{alias.asname or alias.name} = importlib.import_module('{alias.name}')",
exec_globals,
locals(),
)
exec_globals[alias.asname or alias.name] = importlib.import_module(alias.name)
except ModuleNotFoundError as e:
msg = f"Module {alias.name} not found. Please install it and try again."
raise ModuleNotFoundError(msg) from e
function_code = next(
node for node in module.body if isinstance(node, ast.FunctionDef) and node.name == function_name
)
function_code.parent = None
code_obj = compile(ast.Module(body=[function_code], type_ignores=[]), "<string>", "exec")
try:
exec(code_obj, exec_globals, locals())
except Exception as exc:
msg = "Function string does not contain a function"
raise ValueError(msg) from exc
# Add the function to the exec_globals dictionary
exec_globals[function_name] = locals()[function_name]
return exec_globals[function_name](*args, **kwargs)
def create_function(code, function_name):
if not hasattr(ast, "TypeIgnore"):
class TypeIgnore(ast.AST):
_fields = ()
ast.TypeIgnore = TypeIgnore
module = ast.parse(code)
exec_globals = globals().copy()
for node in module.body:
if isinstance(node, ast.Import | ast.ImportFrom):
for alias in node.names:
try:
if isinstance(node, ast.ImportFrom):
module_name = node.module
exec_globals[alias.asname or alias.name] = getattr(
importlib.import_module(module_name), alias.name
)
else:
module_name = alias.name
exec_globals[alias.asname or alias.name] = importlib.import_module(module_name)
except ModuleNotFoundError as e:
msg = f"Module {alias.name} not found. Please install it and try again."
raise ModuleNotFoundError(msg) from e
function_code = next(
node for node in module.body if isinstance(node, ast.FunctionDef) and node.name == function_name
)
function_code.parent = None
code_obj = compile(ast.Module(body=[function_code], type_ignores=[]), "<string>", "exec")
with contextlib.suppress(Exception):
exec(code_obj, exec_globals, locals())
exec_globals[function_name] = locals()[function_name]
# Return a function that imports necessary modules and calls the target function
def wrapped_function(*args, **kwargs):
for module_name, module in exec_globals.items():
if isinstance(module, type(importlib)):
globals()[module_name] = module
return exec_globals[function_name](*args, **kwargs)
return wrapped_function
def create_class(code, class_name):
"""Dynamically create a class from a string of code and a specified class name.
:param code: String containing the Python code defining the class
:param class_name: Name of the class to be created
:return: A function that, when called, returns an instance of the created class
"""
if not hasattr(ast, "TypeIgnore"):
ast.TypeIgnore = create_type_ignore_class()
# Replace from langflow import CustomComponent with from langflow.custom import CustomComponent
code = code.replace("from langflow import CustomComponent", "from langflow.custom import CustomComponent")
code = code.replace(
"from langflow.interface.custom.custom_component import CustomComponent",
"from langflow.custom import CustomComponent",
)
module = ast.parse(code)
exec_globals = prepare_global_scope(code, module)
class_code = extract_class_code(module, class_name)
compiled_class = compile_class_code(class_code)
try:
return build_class_constructor(compiled_class, exec_globals, class_name)
except ValidationError as e:
messages = [error["msg"].split(",", 1) for error in e.errors()]
error_message = "\n".join([message[1] if len(message) > 1 else message[0] for message in messages])
raise ValueError(error_message) from e
def create_type_ignore_class():
"""Create a TypeIgnore class for AST module if it doesn't exist.
:return: TypeIgnore class
"""
class TypeIgnore(ast.AST):
_fields = ()
return TypeIgnore
def prepare_global_scope(code, module):
"""Prepares the global scope with necessary imports from the provided code module.
:param module: AST parsed module
:return: Dictionary representing the global scope with imported modules
"""
exec_globals = globals().copy()
exec_globals.update(get_default_imports(code))
for node in module.body:
if isinstance(node, ast.Import):
for alias in node.names:
try:
exec_globals[alias.asname or alias.name] = importlib.import_module(alias.name)
except ModuleNotFoundError as e:
msg = f"Module {alias.name} not found. Please install it and try again."
raise ModuleNotFoundError(msg) from e
elif isinstance(node, ast.ImportFrom) and node.module is not None:
try:
with warnings.catch_warnings():
warnings.simplefilter("ignore", LangChainDeprecationWarning)
imported_module = importlib.import_module(node.module)
for alias in node.names:
exec_globals[alias.name] = getattr(imported_module, alias.name)
except ModuleNotFoundError as e:
msg = f"Module {node.module} not found. Please install it and try again"
raise ModuleNotFoundError(msg) from e
elif isinstance(node, ast.ClassDef):
# Compile and execute the class definition to properly create the class
class_code = compile(ast.Module(body=[node], type_ignores=[]), "<string>", "exec")
exec(class_code, exec_globals)
elif isinstance(node, ast.FunctionDef):
function_code = compile(ast.Module(body=[node], type_ignores=[]), "<string>", "exec")
exec(function_code, exec_globals)
elif isinstance(node, ast.Assign):
assign_code = compile(ast.Module(body=[node], type_ignores=[]), "<string>", "exec")
exec(assign_code, exec_globals)
return exec_globals
def extract_class_code(module, class_name):
"""Extracts the AST node for the specified class from the module.
:param module: AST parsed module
:param class_name: Name of the class to extract
:return: AST node of the specified class
"""
class_code = next(node for node in module.body if isinstance(node, ast.ClassDef) and node.name == class_name)
class_code.parent = None
return class_code
def compile_class_code(class_code):
"""Compiles the AST node of a class into a code object.
:param class_code: AST node of the class
:return: Compiled code object of the class
"""
return compile(ast.Module(body=[class_code], type_ignores=[]), "<string>", "exec")
def build_class_constructor(compiled_class, exec_globals, class_name):
"""Builds a constructor function for the dynamically created class.
:param compiled_class: Compiled code object of the class
:param exec_globals: Global scope with necessary imports
:param class_name: Name of the class
:return: Constructor function for the class
"""
exec(compiled_class, exec_globals, locals())
exec_globals[class_name] = locals()[class_name]
# Return a function that imports necessary modules and creates an instance of the target class
def build_custom_class():
for module_name, module in exec_globals.items():
if isinstance(module, type(importlib)):
globals()[module_name] = module
exec_globals[class_name]
return exec_globals[class_name]
build_custom_class.__globals__.update(exec_globals)
return build_custom_class()
def get_default_imports(code_string):
"""Returns a dictionary of default imports for the dynamic class constructor."""
default_imports = {
"Optional": Optional,
"List": list,
"Dict": dict,
"Union": Union,
}
langflow_imports = list(CUSTOM_COMPONENT_SUPPORTED_TYPES.keys())
necessary_imports = find_names_in_code(code_string, langflow_imports)
langflow_module = importlib.import_module("langflow.field_typing")
default_imports.update({name: getattr(langflow_module, name) for name in necessary_imports})
return default_imports
def find_names_in_code(code, names):
"""Finds if any of the specified names are present in the given code string.
:param code: The source code as a string.
:param names: A list of names to check for in the code.
:return: A set of names that are found in the code.
"""
return {name for name in names if name in code}
def extract_function_name(code):
module = ast.parse(code)
for node in module.body:
if isinstance(node, ast.FunctionDef):
return node.name
msg = "No function definition found in the code string"
raise ValueError(msg)
def extract_class_name(code: str) -> str:
"""Extract the name of the first Component subclass found in the code.
Args:
code (str): The source code to parse
Returns:
str: Name of the first Component subclass found
Raises:
ValueError: If no Component subclass is found in the code
"""
try:
module = ast.parse(code)
for node in module.body:
if not isinstance(node, ast.ClassDef):
continue
# Check bases for Component inheritance
# TODO: Build a more robust check for Component inheritance
for base in node.bases:
if isinstance(base, ast.Name) and any(pattern in base.id for pattern in ["Component", "LC"]):
return node.name
msg = f"No Component subclass found in the code string. Code snippet: {code[:100]}"
raise TypeError(msg)
except SyntaxError as e:
msg = f"Invalid Python code: {e!s}"
raise ValueError(msg) from e