Spaces:
Running
Running
import ast | |
import contextlib | |
import importlib | |
import warnings | |
from types import FunctionType | |
from typing import Optional, Union | |
from langchain_core._api.deprecation import LangChainDeprecationWarning | |
from loguru import logger | |
from pydantic import ValidationError | |
from langflow.field_typing.constants import CUSTOM_COMPONENT_SUPPORTED_TYPES | |
def add_type_ignores() -> None: | |
if not hasattr(ast, "TypeIgnore"): | |
class TypeIgnore(ast.AST): | |
_fields = () | |
ast.TypeIgnore = TypeIgnore # type: ignore[assignment, misc] | |
def validate_code(code): | |
# Initialize the errors dictionary | |
errors = {"imports": {"errors": []}, "function": {"errors": []}} | |
# Parse the code string into an abstract syntax tree (AST) | |
try: | |
tree = ast.parse(code) | |
except Exception as e: # noqa: BLE001 | |
if hasattr(logger, "opt"): | |
logger.opt(exception=True).debug("Error parsing code") | |
else: | |
logger.debug("Error parsing code") | |
errors["function"]["errors"].append(str(e)) | |
return errors | |
# Add a dummy type_ignores field to the AST | |
add_type_ignores() | |
tree.type_ignores = [] | |
# Evaluate the import statements | |
for node in tree.body: | |
if isinstance(node, ast.Import): | |
for alias in node.names: | |
try: | |
importlib.import_module(alias.name) | |
except ModuleNotFoundError as e: | |
errors["imports"]["errors"].append(str(e)) | |
# Evaluate the function definition | |
for node in tree.body: | |
if isinstance(node, ast.FunctionDef): | |
code_obj = compile(ast.Module(body=[node], type_ignores=[]), "<string>", "exec") | |
try: | |
exec(code_obj) | |
except Exception as e: # noqa: BLE001 | |
logger.opt(exception=True).debug("Error executing function code") | |
errors["function"]["errors"].append(str(e)) | |
# Return the errors dictionary | |
return errors | |
def eval_function(function_string: str): | |
# Create an empty dictionary to serve as a separate namespace | |
namespace: dict = {} | |
# Execute the code string in the new namespace | |
exec(function_string, namespace) | |
function_object = next( | |
( | |
obj | |
for name, obj in namespace.items() | |
if isinstance(obj, FunctionType) and obj.__code__.co_filename == "<string>" | |
), | |
None, | |
) | |
if function_object is None: | |
msg = "Function string does not contain a function" | |
raise ValueError(msg) | |
return function_object | |
def execute_function(code, function_name, *args, **kwargs): | |
add_type_ignores() | |
module = ast.parse(code) | |
exec_globals = globals().copy() | |
for node in module.body: | |
if isinstance(node, ast.Import): | |
for alias in node.names: | |
try: | |
exec( | |
f"{alias.asname or alias.name} = importlib.import_module('{alias.name}')", | |
exec_globals, | |
locals(), | |
) | |
exec_globals[alias.asname or alias.name] = importlib.import_module(alias.name) | |
except ModuleNotFoundError as e: | |
msg = f"Module {alias.name} not found. Please install it and try again." | |
raise ModuleNotFoundError(msg) from e | |
function_code = next( | |
node for node in module.body if isinstance(node, ast.FunctionDef) and node.name == function_name | |
) | |
function_code.parent = None | |
code_obj = compile(ast.Module(body=[function_code], type_ignores=[]), "<string>", "exec") | |
try: | |
exec(code_obj, exec_globals, locals()) | |
except Exception as exc: | |
msg = "Function string does not contain a function" | |
raise ValueError(msg) from exc | |
# Add the function to the exec_globals dictionary | |
exec_globals[function_name] = locals()[function_name] | |
return exec_globals[function_name](*args, **kwargs) | |
def create_function(code, function_name): | |
if not hasattr(ast, "TypeIgnore"): | |
class TypeIgnore(ast.AST): | |
_fields = () | |
ast.TypeIgnore = TypeIgnore | |
module = ast.parse(code) | |
exec_globals = globals().copy() | |
for node in module.body: | |
if isinstance(node, ast.Import | ast.ImportFrom): | |
for alias in node.names: | |
try: | |
if isinstance(node, ast.ImportFrom): | |
module_name = node.module | |
exec_globals[alias.asname or alias.name] = getattr( | |
importlib.import_module(module_name), alias.name | |
) | |
else: | |
module_name = alias.name | |
exec_globals[alias.asname or alias.name] = importlib.import_module(module_name) | |
except ModuleNotFoundError as e: | |
msg = f"Module {alias.name} not found. Please install it and try again." | |
raise ModuleNotFoundError(msg) from e | |
function_code = next( | |
node for node in module.body if isinstance(node, ast.FunctionDef) and node.name == function_name | |
) | |
function_code.parent = None | |
code_obj = compile(ast.Module(body=[function_code], type_ignores=[]), "<string>", "exec") | |
with contextlib.suppress(Exception): | |
exec(code_obj, exec_globals, locals()) | |
exec_globals[function_name] = locals()[function_name] | |
# Return a function that imports necessary modules and calls the target function | |
def wrapped_function(*args, **kwargs): | |
for module_name, module in exec_globals.items(): | |
if isinstance(module, type(importlib)): | |
globals()[module_name] = module | |
return exec_globals[function_name](*args, **kwargs) | |
return wrapped_function | |
def create_class(code, class_name): | |
"""Dynamically create a class from a string of code and a specified class name. | |
:param code: String containing the Python code defining the class | |
:param class_name: Name of the class to be created | |
:return: A function that, when called, returns an instance of the created class | |
""" | |
if not hasattr(ast, "TypeIgnore"): | |
ast.TypeIgnore = create_type_ignore_class() | |
# Replace from langflow import CustomComponent with from langflow.custom import CustomComponent | |
code = code.replace("from langflow import CustomComponent", "from langflow.custom import CustomComponent") | |
code = code.replace( | |
"from langflow.interface.custom.custom_component import CustomComponent", | |
"from langflow.custom import CustomComponent", | |
) | |
module = ast.parse(code) | |
exec_globals = prepare_global_scope(code, module) | |
class_code = extract_class_code(module, class_name) | |
compiled_class = compile_class_code(class_code) | |
try: | |
return build_class_constructor(compiled_class, exec_globals, class_name) | |
except ValidationError as e: | |
messages = [error["msg"].split(",", 1) for error in e.errors()] | |
error_message = "\n".join([message[1] if len(message) > 1 else message[0] for message in messages]) | |
raise ValueError(error_message) from e | |
def create_type_ignore_class(): | |
"""Create a TypeIgnore class for AST module if it doesn't exist. | |
:return: TypeIgnore class | |
""" | |
class TypeIgnore(ast.AST): | |
_fields = () | |
return TypeIgnore | |
def prepare_global_scope(code, module): | |
"""Prepares the global scope with necessary imports from the provided code module. | |
:param module: AST parsed module | |
:return: Dictionary representing the global scope with imported modules | |
""" | |
exec_globals = globals().copy() | |
exec_globals.update(get_default_imports(code)) | |
for node in module.body: | |
if isinstance(node, ast.Import): | |
for alias in node.names: | |
try: | |
exec_globals[alias.asname or alias.name] = importlib.import_module(alias.name) | |
except ModuleNotFoundError as e: | |
msg = f"Module {alias.name} not found. Please install it and try again." | |
raise ModuleNotFoundError(msg) from e | |
elif isinstance(node, ast.ImportFrom) and node.module is not None: | |
try: | |
with warnings.catch_warnings(): | |
warnings.simplefilter("ignore", LangChainDeprecationWarning) | |
imported_module = importlib.import_module(node.module) | |
for alias in node.names: | |
exec_globals[alias.name] = getattr(imported_module, alias.name) | |
except ModuleNotFoundError as e: | |
msg = f"Module {node.module} not found. Please install it and try again" | |
raise ModuleNotFoundError(msg) from e | |
elif isinstance(node, ast.ClassDef): | |
# Compile and execute the class definition to properly create the class | |
class_code = compile(ast.Module(body=[node], type_ignores=[]), "<string>", "exec") | |
exec(class_code, exec_globals) | |
elif isinstance(node, ast.FunctionDef): | |
function_code = compile(ast.Module(body=[node], type_ignores=[]), "<string>", "exec") | |
exec(function_code, exec_globals) | |
elif isinstance(node, ast.Assign): | |
assign_code = compile(ast.Module(body=[node], type_ignores=[]), "<string>", "exec") | |
exec(assign_code, exec_globals) | |
return exec_globals | |
def extract_class_code(module, class_name): | |
"""Extracts the AST node for the specified class from the module. | |
:param module: AST parsed module | |
:param class_name: Name of the class to extract | |
:return: AST node of the specified class | |
""" | |
class_code = next(node for node in module.body if isinstance(node, ast.ClassDef) and node.name == class_name) | |
class_code.parent = None | |
return class_code | |
def compile_class_code(class_code): | |
"""Compiles the AST node of a class into a code object. | |
:param class_code: AST node of the class | |
:return: Compiled code object of the class | |
""" | |
return compile(ast.Module(body=[class_code], type_ignores=[]), "<string>", "exec") | |
def build_class_constructor(compiled_class, exec_globals, class_name): | |
"""Builds a constructor function for the dynamically created class. | |
:param compiled_class: Compiled code object of the class | |
:param exec_globals: Global scope with necessary imports | |
:param class_name: Name of the class | |
:return: Constructor function for the class | |
""" | |
exec(compiled_class, exec_globals, locals()) | |
exec_globals[class_name] = locals()[class_name] | |
# Return a function that imports necessary modules and creates an instance of the target class | |
def build_custom_class(): | |
for module_name, module in exec_globals.items(): | |
if isinstance(module, type(importlib)): | |
globals()[module_name] = module | |
exec_globals[class_name] | |
return exec_globals[class_name] | |
build_custom_class.__globals__.update(exec_globals) | |
return build_custom_class() | |
def get_default_imports(code_string): | |
"""Returns a dictionary of default imports for the dynamic class constructor.""" | |
default_imports = { | |
"Optional": Optional, | |
"List": list, | |
"Dict": dict, | |
"Union": Union, | |
} | |
langflow_imports = list(CUSTOM_COMPONENT_SUPPORTED_TYPES.keys()) | |
necessary_imports = find_names_in_code(code_string, langflow_imports) | |
langflow_module = importlib.import_module("langflow.field_typing") | |
default_imports.update({name: getattr(langflow_module, name) for name in necessary_imports}) | |
return default_imports | |
def find_names_in_code(code, names): | |
"""Finds if any of the specified names are present in the given code string. | |
:param code: The source code as a string. | |
:param names: A list of names to check for in the code. | |
:return: A set of names that are found in the code. | |
""" | |
return {name for name in names if name in code} | |
def extract_function_name(code): | |
module = ast.parse(code) | |
for node in module.body: | |
if isinstance(node, ast.FunctionDef): | |
return node.name | |
msg = "No function definition found in the code string" | |
raise ValueError(msg) | |
def extract_class_name(code: str) -> str: | |
"""Extract the name of the first Component subclass found in the code. | |
Args: | |
code (str): The source code to parse | |
Returns: | |
str: Name of the first Component subclass found | |
Raises: | |
ValueError: If no Component subclass is found in the code | |
""" | |
try: | |
module = ast.parse(code) | |
for node in module.body: | |
if not isinstance(node, ast.ClassDef): | |
continue | |
# Check bases for Component inheritance | |
# TODO: Build a more robust check for Component inheritance | |
for base in node.bases: | |
if isinstance(base, ast.Name) and any(pattern in base.id for pattern in ["Component", "LC"]): | |
return node.name | |
msg = f"No Component subclass found in the code string. Code snippet: {code[:100]}" | |
raise TypeError(msg) | |
except SyntaxError as e: | |
msg = f"Invalid Python code: {e!s}" | |
raise ValueError(msg) from e | |