Tai Truong
fix readme
d202ada
from __future__ import annotations
import inspect
import os
import warnings
from typing import TYPE_CHECKING, Any
import orjson
from loguru import logger
from pydantic import PydanticDeprecatedSince20
from langflow.custom.eval import eval_custom_component_code
from langflow.schema import Data
from langflow.schema.artifact import get_artifact_type, post_process_raw
from langflow.services.deps import get_tracing_service
if TYPE_CHECKING:
from langflow.custom import Component, CustomComponent
from langflow.events.event_manager import EventManager
from langflow.graph.vertex.base import Vertex
def instantiate_class(
vertex: Vertex,
user_id=None,
event_manager: EventManager | None = None,
) -> Any:
"""Instantiate class from module type and key, and params."""
vertex_type = vertex.vertex_type
base_type = vertex.base_type
logger.debug(f"Instantiating {vertex_type} of type {base_type}")
if not base_type:
msg = "No base type provided for vertex"
raise ValueError(msg)
custom_params = get_params(vertex.params)
code = custom_params.pop("code")
class_object: type[CustomComponent | Component] = eval_custom_component_code(code)
custom_component: CustomComponent | Component = class_object(
_user_id=user_id,
_parameters=custom_params,
_vertex=vertex,
_tracing_service=get_tracing_service(),
_id=vertex.id,
)
if hasattr(custom_component, "set_event_manager"):
custom_component.set_event_manager(event_manager)
return custom_component, custom_params
async def get_instance_results(
custom_component,
custom_params: dict,
vertex: Vertex,
*,
fallback_to_env_vars: bool = False,
base_type: str = "component",
):
custom_params = update_params_with_load_from_db_fields(
custom_component, custom_params, vertex.load_from_db_fields, fallback_to_env_vars=fallback_to_env_vars
)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=PydanticDeprecatedSince20)
if base_type == "custom_components":
return await build_custom_component(params=custom_params, custom_component=custom_component)
if base_type == "component":
return await build_component(params=custom_params, custom_component=custom_component)
msg = f"Base type {base_type} not found."
raise ValueError(msg)
def get_params(vertex_params):
params = vertex_params
params = convert_params_to_sets(params)
params = convert_kwargs(params)
return params.copy()
def convert_params_to_sets(params):
"""Convert certain params to sets."""
if "allowed_special" in params:
params["allowed_special"] = set(params["allowed_special"])
if "disallowed_special" in params:
params["disallowed_special"] = set(params["disallowed_special"])
return params
def convert_kwargs(params):
# Loop through items to avoid repeated lookups
items_to_remove = []
for key, value in params.items():
if ("kwargs" in key or "config" in key) and isinstance(value, str):
try:
params[key] = orjson.loads(value)
except orjson.JSONDecodeError:
items_to_remove.append(key)
# Remove invalid keys outside the loop to avoid modifying dict during iteration
for key in items_to_remove:
params.pop(key, None)
return params
def update_params_with_load_from_db_fields(
custom_component: CustomComponent,
params,
load_from_db_fields,
*,
fallback_to_env_vars=False,
):
for field in load_from_db_fields:
if field not in params:
continue
try:
key = custom_component.variables(params[field], field)
except ValueError as e:
if any(reason in str(e) for reason in ["User id is not set", "variable not found."]):
raise
logger.debug(str(e))
key = None
if fallback_to_env_vars and key is None:
key = os.getenv(params[field])
if key:
logger.info(f"Using environment variable {params[field]} for {field}")
else:
logger.error(f"Environment variable {params[field]} is not set.")
params[field] = key if key is not None else None
if key is None:
logger.warning(f"Could not get value for {field}. Setting it to None.")
return params
async def build_component(
params: dict,
custom_component: Component,
):
# Now set the params as attributes of the custom_component
custom_component.set_attributes(params)
build_results, artifacts = await custom_component.build_results()
return custom_component, build_results, artifacts
async def build_custom_component(params: dict, custom_component: CustomComponent):
if "retriever" in params and hasattr(params["retriever"], "as_retriever"):
params["retriever"] = params["retriever"].as_retriever()
# Determine if the build method is asynchronous
is_async = inspect.iscoroutinefunction(custom_component.build)
# New feature: the component has a list of outputs and we have
# to check the vertex.edges to see which is connected (coulb be multiple)
# and then we'll get the output which has the name of the method we should call.
# the methods don't require any params because they are already set in the custom_component
# so we can just call them
if is_async:
# Await the build method directly if it's async
build_result = await custom_component.build(**params)
else:
# Call the build method directly if it's sync
build_result = custom_component.build(**params)
custom_repr = custom_component.custom_repr()
if custom_repr is None and isinstance(build_result, dict | Data | str):
custom_repr = build_result
if not isinstance(custom_repr, str):
custom_repr = str(custom_repr)
raw = custom_component.repr_value
if hasattr(raw, "data") and raw is not None:
raw = raw.data
elif hasattr(raw, "model_dump") and raw is not None:
raw = raw.model_dump()
if raw is None and isinstance(build_result, dict | Data | str):
raw = build_result.data if isinstance(build_result, Data) else build_result
artifact_type = get_artifact_type(custom_component.repr_value or raw, build_result)
raw = post_process_raw(raw, artifact_type)
artifact = {"repr": custom_repr, "raw": raw, "type": artifact_type}
if custom_component._vertex is not None:
custom_component._artifacts = {custom_component._vertex.outputs[0].get("name"): artifact}
custom_component._results = {custom_component._vertex.outputs[0].get("name"): build_result}
return custom_component, build_result, artifact
msg = "Custom component does not have a vertex"
raise ValueError(msg)