from __future__ import annotations import inspect import os import warnings from typing import TYPE_CHECKING, Any import orjson from loguru import logger from pydantic import PydanticDeprecatedSince20 from langflow.custom.eval import eval_custom_component_code from langflow.schema import Data from langflow.schema.artifact import get_artifact_type, post_process_raw from langflow.services.deps import get_tracing_service if TYPE_CHECKING: from langflow.custom import Component, CustomComponent from langflow.events.event_manager import EventManager from langflow.graph.vertex.base import Vertex def instantiate_class( vertex: Vertex, user_id=None, event_manager: EventManager | None = None, ) -> Any: """Instantiate class from module type and key, and params.""" vertex_type = vertex.vertex_type base_type = vertex.base_type logger.debug(f"Instantiating {vertex_type} of type {base_type}") if not base_type: msg = "No base type provided for vertex" raise ValueError(msg) custom_params = get_params(vertex.params) code = custom_params.pop("code") class_object: type[CustomComponent | Component] = eval_custom_component_code(code) custom_component: CustomComponent | Component = class_object( _user_id=user_id, _parameters=custom_params, _vertex=vertex, _tracing_service=get_tracing_service(), _id=vertex.id, ) if hasattr(custom_component, "set_event_manager"): custom_component.set_event_manager(event_manager) return custom_component, custom_params async def get_instance_results( custom_component, custom_params: dict, vertex: Vertex, *, fallback_to_env_vars: bool = False, base_type: str = "component", ): custom_params = update_params_with_load_from_db_fields( custom_component, custom_params, vertex.load_from_db_fields, fallback_to_env_vars=fallback_to_env_vars ) with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=PydanticDeprecatedSince20) if base_type == "custom_components": return await build_custom_component(params=custom_params, custom_component=custom_component) if base_type == "component": return await build_component(params=custom_params, custom_component=custom_component) msg = f"Base type {base_type} not found." raise ValueError(msg) def get_params(vertex_params): params = vertex_params params = convert_params_to_sets(params) params = convert_kwargs(params) return params.copy() def convert_params_to_sets(params): """Convert certain params to sets.""" if "allowed_special" in params: params["allowed_special"] = set(params["allowed_special"]) if "disallowed_special" in params: params["disallowed_special"] = set(params["disallowed_special"]) return params def convert_kwargs(params): # Loop through items to avoid repeated lookups items_to_remove = [] for key, value in params.items(): if ("kwargs" in key or "config" in key) and isinstance(value, str): try: params[key] = orjson.loads(value) except orjson.JSONDecodeError: items_to_remove.append(key) # Remove invalid keys outside the loop to avoid modifying dict during iteration for key in items_to_remove: params.pop(key, None) return params def update_params_with_load_from_db_fields( custom_component: CustomComponent, params, load_from_db_fields, *, fallback_to_env_vars=False, ): for field in load_from_db_fields: if field not in params: continue try: key = custom_component.variables(params[field], field) except ValueError as e: if any(reason in str(e) for reason in ["User id is not set", "variable not found."]): raise logger.debug(str(e)) key = None if fallback_to_env_vars and key is None: key = os.getenv(params[field]) if key: logger.info(f"Using environment variable {params[field]} for {field}") else: logger.error(f"Environment variable {params[field]} is not set.") params[field] = key if key is not None else None if key is None: logger.warning(f"Could not get value for {field}. Setting it to None.") return params async def build_component( params: dict, custom_component: Component, ): # Now set the params as attributes of the custom_component custom_component.set_attributes(params) build_results, artifacts = await custom_component.build_results() return custom_component, build_results, artifacts async def build_custom_component(params: dict, custom_component: CustomComponent): if "retriever" in params and hasattr(params["retriever"], "as_retriever"): params["retriever"] = params["retriever"].as_retriever() # Determine if the build method is asynchronous is_async = inspect.iscoroutinefunction(custom_component.build) # New feature: the component has a list of outputs and we have # to check the vertex.edges to see which is connected (coulb be multiple) # and then we'll get the output which has the name of the method we should call. # the methods don't require any params because they are already set in the custom_component # so we can just call them if is_async: # Await the build method directly if it's async build_result = await custom_component.build(**params) else: # Call the build method directly if it's sync build_result = custom_component.build(**params) custom_repr = custom_component.custom_repr() if custom_repr is None and isinstance(build_result, dict | Data | str): custom_repr = build_result if not isinstance(custom_repr, str): custom_repr = str(custom_repr) raw = custom_component.repr_value if hasattr(raw, "data") and raw is not None: raw = raw.data elif hasattr(raw, "model_dump") and raw is not None: raw = raw.model_dump() if raw is None and isinstance(build_result, dict | Data | str): raw = build_result.data if isinstance(build_result, Data) else build_result artifact_type = get_artifact_type(custom_component.repr_value or raw, build_result) raw = post_process_raw(raw, artifact_type) artifact = {"repr": custom_repr, "raw": raw, "type": artifact_type} if custom_component._vertex is not None: custom_component._artifacts = {custom_component._vertex.outputs[0].get("name"): artifact} custom_component._results = {custom_component._vertex.outputs[0].get("name"): build_result} return custom_component, build_result, artifact msg = "Custom component does not have a vertex" raise ValueError(msg)