Spaces:
Running
Running
from __future__ import annotations | |
import inspect | |
import os | |
import warnings | |
from typing import TYPE_CHECKING, Any | |
import orjson | |
from loguru import logger | |
from pydantic import PydanticDeprecatedSince20 | |
from langflow.custom.eval import eval_custom_component_code | |
from langflow.schema import Data | |
from langflow.schema.artifact import get_artifact_type, post_process_raw | |
from langflow.services.deps import get_tracing_service | |
if TYPE_CHECKING: | |
from langflow.custom import Component, CustomComponent | |
from langflow.events.event_manager import EventManager | |
from langflow.graph.vertex.base import Vertex | |
def instantiate_class( | |
vertex: Vertex, | |
user_id=None, | |
event_manager: EventManager | None = None, | |
) -> Any: | |
"""Instantiate class from module type and key, and params.""" | |
vertex_type = vertex.vertex_type | |
base_type = vertex.base_type | |
logger.debug(f"Instantiating {vertex_type} of type {base_type}") | |
if not base_type: | |
msg = "No base type provided for vertex" | |
raise ValueError(msg) | |
custom_params = get_params(vertex.params) | |
code = custom_params.pop("code") | |
class_object: type[CustomComponent | Component] = eval_custom_component_code(code) | |
custom_component: CustomComponent | Component = class_object( | |
_user_id=user_id, | |
_parameters=custom_params, | |
_vertex=vertex, | |
_tracing_service=get_tracing_service(), | |
_id=vertex.id, | |
) | |
if hasattr(custom_component, "set_event_manager"): | |
custom_component.set_event_manager(event_manager) | |
return custom_component, custom_params | |
async def get_instance_results( | |
custom_component, | |
custom_params: dict, | |
vertex: Vertex, | |
*, | |
fallback_to_env_vars: bool = False, | |
base_type: str = "component", | |
): | |
custom_params = update_params_with_load_from_db_fields( | |
custom_component, custom_params, vertex.load_from_db_fields, fallback_to_env_vars=fallback_to_env_vars | |
) | |
with warnings.catch_warnings(): | |
warnings.filterwarnings("ignore", category=PydanticDeprecatedSince20) | |
if base_type == "custom_components": | |
return await build_custom_component(params=custom_params, custom_component=custom_component) | |
if base_type == "component": | |
return await build_component(params=custom_params, custom_component=custom_component) | |
msg = f"Base type {base_type} not found." | |
raise ValueError(msg) | |
def get_params(vertex_params): | |
params = vertex_params | |
params = convert_params_to_sets(params) | |
params = convert_kwargs(params) | |
return params.copy() | |
def convert_params_to_sets(params): | |
"""Convert certain params to sets.""" | |
if "allowed_special" in params: | |
params["allowed_special"] = set(params["allowed_special"]) | |
if "disallowed_special" in params: | |
params["disallowed_special"] = set(params["disallowed_special"]) | |
return params | |
def convert_kwargs(params): | |
# Loop through items to avoid repeated lookups | |
items_to_remove = [] | |
for key, value in params.items(): | |
if ("kwargs" in key or "config" in key) and isinstance(value, str): | |
try: | |
params[key] = orjson.loads(value) | |
except orjson.JSONDecodeError: | |
items_to_remove.append(key) | |
# Remove invalid keys outside the loop to avoid modifying dict during iteration | |
for key in items_to_remove: | |
params.pop(key, None) | |
return params | |
def update_params_with_load_from_db_fields( | |
custom_component: CustomComponent, | |
params, | |
load_from_db_fields, | |
*, | |
fallback_to_env_vars=False, | |
): | |
for field in load_from_db_fields: | |
if field not in params: | |
continue | |
try: | |
key = custom_component.variables(params[field], field) | |
except ValueError as e: | |
if any(reason in str(e) for reason in ["User id is not set", "variable not found."]): | |
raise | |
logger.debug(str(e)) | |
key = None | |
if fallback_to_env_vars and key is None: | |
key = os.getenv(params[field]) | |
if key: | |
logger.info(f"Using environment variable {params[field]} for {field}") | |
else: | |
logger.error(f"Environment variable {params[field]} is not set.") | |
params[field] = key if key is not None else None | |
if key is None: | |
logger.warning(f"Could not get value for {field}. Setting it to None.") | |
return params | |
async def build_component( | |
params: dict, | |
custom_component: Component, | |
): | |
# Now set the params as attributes of the custom_component | |
custom_component.set_attributes(params) | |
build_results, artifacts = await custom_component.build_results() | |
return custom_component, build_results, artifacts | |
async def build_custom_component(params: dict, custom_component: CustomComponent): | |
if "retriever" in params and hasattr(params["retriever"], "as_retriever"): | |
params["retriever"] = params["retriever"].as_retriever() | |
# Determine if the build method is asynchronous | |
is_async = inspect.iscoroutinefunction(custom_component.build) | |
# New feature: the component has a list of outputs and we have | |
# to check the vertex.edges to see which is connected (coulb be multiple) | |
# and then we'll get the output which has the name of the method we should call. | |
# the methods don't require any params because they are already set in the custom_component | |
# so we can just call them | |
if is_async: | |
# Await the build method directly if it's async | |
build_result = await custom_component.build(**params) | |
else: | |
# Call the build method directly if it's sync | |
build_result = custom_component.build(**params) | |
custom_repr = custom_component.custom_repr() | |
if custom_repr is None and isinstance(build_result, dict | Data | str): | |
custom_repr = build_result | |
if not isinstance(custom_repr, str): | |
custom_repr = str(custom_repr) | |
raw = custom_component.repr_value | |
if hasattr(raw, "data") and raw is not None: | |
raw = raw.data | |
elif hasattr(raw, "model_dump") and raw is not None: | |
raw = raw.model_dump() | |
if raw is None and isinstance(build_result, dict | Data | str): | |
raw = build_result.data if isinstance(build_result, Data) else build_result | |
artifact_type = get_artifact_type(custom_component.repr_value or raw, build_result) | |
raw = post_process_raw(raw, artifact_type) | |
artifact = {"repr": custom_repr, "raw": raw, "type": artifact_type} | |
if custom_component._vertex is not None: | |
custom_component._artifacts = {custom_component._vertex.outputs[0].get("name"): artifact} | |
custom_component._results = {custom_component._vertex.outputs[0].get("name"): build_result} | |
return custom_component, build_result, artifact | |
msg = "Custom component does not have a vertex" | |
raise ValueError(msg) | |