Spaces:
Running
Running
from collections.abc import Generator | |
from enum import Enum | |
from typing import Literal | |
from pydantic import BaseModel | |
from typing_extensions import TypedDict | |
from langflow.schema.data import Data | |
from langflow.schema.dataframe import DataFrame | |
from langflow.schema.message import Message | |
from langflow.schema.serialize import recursive_serialize_or_str | |
INPUT_FIELD_NAME = "input_value" | |
InputType = Literal["chat", "text", "any"] | |
OutputType = Literal["chat", "text", "any", "debug"] | |
class LogType(str, Enum): | |
MESSAGE = "message" | |
DATA = "data" | |
STREAM = "stream" | |
OBJECT = "object" | |
ARRAY = "array" | |
TEXT = "text" | |
UNKNOWN = "unknown" | |
class StreamURL(TypedDict): | |
location: str | |
class ErrorLog(TypedDict): | |
errorMessage: str | |
stackTrace: str | |
class OutputValue(BaseModel): | |
message: ErrorLog | StreamURL | dict | list | str | |
type: str | |
def get_type(payload): | |
result = LogType.UNKNOWN | |
match payload: | |
case Message(): | |
result = LogType.MESSAGE | |
case Data(): | |
result = LogType.DATA | |
case dict(): | |
result = LogType.OBJECT | |
case list() | DataFrame(): | |
result = LogType.ARRAY | |
case str(): | |
result = LogType.TEXT | |
if result == LogType.UNKNOWN and ( | |
(payload and isinstance(payload, Generator)) | |
or (isinstance(payload, Message) and isinstance(payload.text, Generator)) | |
): | |
result = LogType.STREAM | |
return result | |
def get_message(payload): | |
message = None | |
if hasattr(payload, "data"): | |
message = payload.data | |
elif hasattr(payload, "model_dump"): | |
message = payload.model_dump() | |
if message is None and isinstance(payload, dict | str | Data): | |
message = payload.data if isinstance(payload, Data) else payload | |
return message or payload | |
def build_output_logs(vertex, result) -> dict: | |
outputs: dict[str, OutputValue] = {} | |
component_instance = result[0] | |
for index, output in enumerate(vertex.outputs): | |
if component_instance.status is None: | |
payload = component_instance._results | |
output_result = payload.get(output["name"]) | |
else: | |
payload = component_instance._artifacts | |
output_result = payload.get(output["name"], {}).get("raw") | |
message = get_message(output_result) | |
type_ = get_type(output_result) | |
match type_: | |
case LogType.STREAM if "stream_url" in message: | |
message = StreamURL(location=message["stream_url"]) | |
case LogType.STREAM: | |
message = "" | |
case LogType.MESSAGE if hasattr(message, "message"): | |
message = message.message | |
case LogType.UNKNOWN: | |
message = "" | |
case LogType.ARRAY: | |
if isinstance(message, DataFrame): | |
message = message.to_dict(orient="records") | |
message = [recursive_serialize_or_str(item) for item in message] | |
name = output.get("name", f"output_{index}") | |
outputs |= {name: OutputValue(message=message, type=type_).model_dump()} | |
return outputs | |