Tai Truong
fix readme
d202ada
from collections.abc import Generator
from enum import Enum
from typing import Literal
from pydantic import BaseModel
from typing_extensions import TypedDict
from langflow.schema.data import Data
from langflow.schema.dataframe import DataFrame
from langflow.schema.message import Message
from langflow.schema.serialize import recursive_serialize_or_str
INPUT_FIELD_NAME = "input_value"
InputType = Literal["chat", "text", "any"]
OutputType = Literal["chat", "text", "any", "debug"]
class LogType(str, Enum):
MESSAGE = "message"
DATA = "data"
STREAM = "stream"
OBJECT = "object"
ARRAY = "array"
TEXT = "text"
UNKNOWN = "unknown"
class StreamURL(TypedDict):
location: str
class ErrorLog(TypedDict):
errorMessage: str
stackTrace: str
class OutputValue(BaseModel):
message: ErrorLog | StreamURL | dict | list | str
type: str
def get_type(payload):
result = LogType.UNKNOWN
match payload:
case Message():
result = LogType.MESSAGE
case Data():
result = LogType.DATA
case dict():
result = LogType.OBJECT
case list() | DataFrame():
result = LogType.ARRAY
case str():
result = LogType.TEXT
if result == LogType.UNKNOWN and (
(payload and isinstance(payload, Generator))
or (isinstance(payload, Message) and isinstance(payload.text, Generator))
):
result = LogType.STREAM
return result
def get_message(payload):
message = None
if hasattr(payload, "data"):
message = payload.data
elif hasattr(payload, "model_dump"):
message = payload.model_dump()
if message is None and isinstance(payload, dict | str | Data):
message = payload.data if isinstance(payload, Data) else payload
return message or payload
def build_output_logs(vertex, result) -> dict:
outputs: dict[str, OutputValue] = {}
component_instance = result[0]
for index, output in enumerate(vertex.outputs):
if component_instance.status is None:
payload = component_instance._results
output_result = payload.get(output["name"])
else:
payload = component_instance._artifacts
output_result = payload.get(output["name"], {}).get("raw")
message = get_message(output_result)
type_ = get_type(output_result)
match type_:
case LogType.STREAM if "stream_url" in message:
message = StreamURL(location=message["stream_url"])
case LogType.STREAM:
message = ""
case LogType.MESSAGE if hasattr(message, "message"):
message = message.message
case LogType.UNKNOWN:
message = ""
case LogType.ARRAY:
if isinstance(message, DataFrame):
message = message.to_dict(orient="records")
message = [recursive_serialize_or_str(item) for item in message]
name = output.get("name", f"output_{index}")
outputs |= {name: OutputValue(message=message, type=type_).model_dump()}
return outputs