import ast import contextlib import importlib import warnings from types import FunctionType from typing import Optional, Union from langchain_core._api.deprecation import LangChainDeprecationWarning from loguru import logger from pydantic import ValidationError from langflow.field_typing.constants import CUSTOM_COMPONENT_SUPPORTED_TYPES def add_type_ignores() -> None: if not hasattr(ast, "TypeIgnore"): class TypeIgnore(ast.AST): _fields = () ast.TypeIgnore = TypeIgnore # type: ignore[assignment, misc] def validate_code(code): # Initialize the errors dictionary errors = {"imports": {"errors": []}, "function": {"errors": []}} # Parse the code string into an abstract syntax tree (AST) try: tree = ast.parse(code) except Exception as e: # noqa: BLE001 if hasattr(logger, "opt"): logger.opt(exception=True).debug("Error parsing code") else: logger.debug("Error parsing code") errors["function"]["errors"].append(str(e)) return errors # Add a dummy type_ignores field to the AST add_type_ignores() tree.type_ignores = [] # Evaluate the import statements for node in tree.body: if isinstance(node, ast.Import): for alias in node.names: try: importlib.import_module(alias.name) except ModuleNotFoundError as e: errors["imports"]["errors"].append(str(e)) # Evaluate the function definition for node in tree.body: if isinstance(node, ast.FunctionDef): code_obj = compile(ast.Module(body=[node], type_ignores=[]), "", "exec") try: exec(code_obj) except Exception as e: # noqa: BLE001 logger.opt(exception=True).debug("Error executing function code") errors["function"]["errors"].append(str(e)) # Return the errors dictionary return errors def eval_function(function_string: str): # Create an empty dictionary to serve as a separate namespace namespace: dict = {} # Execute the code string in the new namespace exec(function_string, namespace) function_object = next( ( obj for name, obj in namespace.items() if isinstance(obj, FunctionType) and obj.__code__.co_filename == "" ), None, ) if function_object is None: msg = "Function string does not contain a function" raise ValueError(msg) return function_object def execute_function(code, function_name, *args, **kwargs): add_type_ignores() module = ast.parse(code) exec_globals = globals().copy() for node in module.body: if isinstance(node, ast.Import): for alias in node.names: try: exec( f"{alias.asname or alias.name} = importlib.import_module('{alias.name}')", exec_globals, locals(), ) exec_globals[alias.asname or alias.name] = importlib.import_module(alias.name) except ModuleNotFoundError as e: msg = f"Module {alias.name} not found. Please install it and try again." raise ModuleNotFoundError(msg) from e function_code = next( node for node in module.body if isinstance(node, ast.FunctionDef) and node.name == function_name ) function_code.parent = None code_obj = compile(ast.Module(body=[function_code], type_ignores=[]), "", "exec") try: exec(code_obj, exec_globals, locals()) except Exception as exc: msg = "Function string does not contain a function" raise ValueError(msg) from exc # Add the function to the exec_globals dictionary exec_globals[function_name] = locals()[function_name] return exec_globals[function_name](*args, **kwargs) def create_function(code, function_name): if not hasattr(ast, "TypeIgnore"): class TypeIgnore(ast.AST): _fields = () ast.TypeIgnore = TypeIgnore module = ast.parse(code) exec_globals = globals().copy() for node in module.body: if isinstance(node, ast.Import | ast.ImportFrom): for alias in node.names: try: if isinstance(node, ast.ImportFrom): module_name = node.module exec_globals[alias.asname or alias.name] = getattr( importlib.import_module(module_name), alias.name ) else: module_name = alias.name exec_globals[alias.asname or alias.name] = importlib.import_module(module_name) except ModuleNotFoundError as e: msg = f"Module {alias.name} not found. Please install it and try again." raise ModuleNotFoundError(msg) from e function_code = next( node for node in module.body if isinstance(node, ast.FunctionDef) and node.name == function_name ) function_code.parent = None code_obj = compile(ast.Module(body=[function_code], type_ignores=[]), "", "exec") with contextlib.suppress(Exception): exec(code_obj, exec_globals, locals()) exec_globals[function_name] = locals()[function_name] # Return a function that imports necessary modules and calls the target function def wrapped_function(*args, **kwargs): for module_name, module in exec_globals.items(): if isinstance(module, type(importlib)): globals()[module_name] = module return exec_globals[function_name](*args, **kwargs) return wrapped_function def create_class(code, class_name): """Dynamically create a class from a string of code and a specified class name. :param code: String containing the Python code defining the class :param class_name: Name of the class to be created :return: A function that, when called, returns an instance of the created class """ if not hasattr(ast, "TypeIgnore"): ast.TypeIgnore = create_type_ignore_class() # Replace from langflow import CustomComponent with from langflow.custom import CustomComponent code = code.replace("from langflow import CustomComponent", "from langflow.custom import CustomComponent") code = code.replace( "from langflow.interface.custom.custom_component import CustomComponent", "from langflow.custom import CustomComponent", ) module = ast.parse(code) exec_globals = prepare_global_scope(code, module) class_code = extract_class_code(module, class_name) compiled_class = compile_class_code(class_code) try: return build_class_constructor(compiled_class, exec_globals, class_name) except ValidationError as e: messages = [error["msg"].split(",", 1) for error in e.errors()] error_message = "\n".join([message[1] if len(message) > 1 else message[0] for message in messages]) raise ValueError(error_message) from e def create_type_ignore_class(): """Create a TypeIgnore class for AST module if it doesn't exist. :return: TypeIgnore class """ class TypeIgnore(ast.AST): _fields = () return TypeIgnore def prepare_global_scope(code, module): """Prepares the global scope with necessary imports from the provided code module. :param module: AST parsed module :return: Dictionary representing the global scope with imported modules """ exec_globals = globals().copy() exec_globals.update(get_default_imports(code)) for node in module.body: if isinstance(node, ast.Import): for alias in node.names: try: exec_globals[alias.asname or alias.name] = importlib.import_module(alias.name) except ModuleNotFoundError as e: msg = f"Module {alias.name} not found. Please install it and try again." raise ModuleNotFoundError(msg) from e elif isinstance(node, ast.ImportFrom) and node.module is not None: try: with warnings.catch_warnings(): warnings.simplefilter("ignore", LangChainDeprecationWarning) imported_module = importlib.import_module(node.module) for alias in node.names: exec_globals[alias.name] = getattr(imported_module, alias.name) except ModuleNotFoundError as e: msg = f"Module {node.module} not found. Please install it and try again" raise ModuleNotFoundError(msg) from e elif isinstance(node, ast.ClassDef): # Compile and execute the class definition to properly create the class class_code = compile(ast.Module(body=[node], type_ignores=[]), "", "exec") exec(class_code, exec_globals) elif isinstance(node, ast.FunctionDef): function_code = compile(ast.Module(body=[node], type_ignores=[]), "", "exec") exec(function_code, exec_globals) elif isinstance(node, ast.Assign): assign_code = compile(ast.Module(body=[node], type_ignores=[]), "", "exec") exec(assign_code, exec_globals) return exec_globals def extract_class_code(module, class_name): """Extracts the AST node for the specified class from the module. :param module: AST parsed module :param class_name: Name of the class to extract :return: AST node of the specified class """ class_code = next(node for node in module.body if isinstance(node, ast.ClassDef) and node.name == class_name) class_code.parent = None return class_code def compile_class_code(class_code): """Compiles the AST node of a class into a code object. :param class_code: AST node of the class :return: Compiled code object of the class """ return compile(ast.Module(body=[class_code], type_ignores=[]), "", "exec") def build_class_constructor(compiled_class, exec_globals, class_name): """Builds a constructor function for the dynamically created class. :param compiled_class: Compiled code object of the class :param exec_globals: Global scope with necessary imports :param class_name: Name of the class :return: Constructor function for the class """ exec(compiled_class, exec_globals, locals()) exec_globals[class_name] = locals()[class_name] # Return a function that imports necessary modules and creates an instance of the target class def build_custom_class(): for module_name, module in exec_globals.items(): if isinstance(module, type(importlib)): globals()[module_name] = module exec_globals[class_name] return exec_globals[class_name] build_custom_class.__globals__.update(exec_globals) return build_custom_class() def get_default_imports(code_string): """Returns a dictionary of default imports for the dynamic class constructor.""" default_imports = { "Optional": Optional, "List": list, "Dict": dict, "Union": Union, } langflow_imports = list(CUSTOM_COMPONENT_SUPPORTED_TYPES.keys()) necessary_imports = find_names_in_code(code_string, langflow_imports) langflow_module = importlib.import_module("langflow.field_typing") default_imports.update({name: getattr(langflow_module, name) for name in necessary_imports}) return default_imports def find_names_in_code(code, names): """Finds if any of the specified names are present in the given code string. :param code: The source code as a string. :param names: A list of names to check for in the code. :return: A set of names that are found in the code. """ return {name for name in names if name in code} def extract_function_name(code): module = ast.parse(code) for node in module.body: if isinstance(node, ast.FunctionDef): return node.name msg = "No function definition found in the code string" raise ValueError(msg) def extract_class_name(code: str) -> str: """Extract the name of the first Component subclass found in the code. Args: code (str): The source code to parse Returns: str: Name of the first Component subclass found Raises: ValueError: If no Component subclass is found in the code """ try: module = ast.parse(code) for node in module.body: if not isinstance(node, ast.ClassDef): continue # Check bases for Component inheritance # TODO: Build a more robust check for Component inheritance for base in node.bases: if isinstance(base, ast.Name) and any(pattern in base.id for pattern in ["Component", "LC"]): return node.name msg = f"No Component subclass found in the code string. Code snippet: {code[:100]}" raise TypeError(msg) except SyntaxError as e: msg = f"Invalid Python code: {e!s}" raise ValueError(msg) from e