agent-flow / src /backend /base /langflow /schema /playground_events.py
Tai Truong
fix readme
d202ada
import inspect
from collections.abc import Callable
from datetime import datetime, timezone
from typing import Annotated, Literal
from uuid import UUID
from pydantic import BaseModel, ConfigDict, Field, field_serializer, field_validator
from langflow.schema.content_block import ContentBlock
from langflow.schema.content_types import ErrorContent
from langflow.schema.properties import Properties
from langflow.schema.validators import timestamp_to_str_validator
from langflow.utils.constants import MESSAGE_SENDER_USER
class PlaygroundEvent(BaseModel):
model_config = ConfigDict(extra="allow", populate_by_name=True)
properties: Properties | None = Field(default=None)
sender_name: str | None = Field(default=None)
content_blocks: list[ContentBlock] | None = Field(default=None)
format_type: Literal["default", "error", "warning", "info"] = Field(default="default")
files: list[str] | None = Field(default=None)
text: str | None = Field(default=None)
timestamp: Annotated[str, timestamp_to_str_validator] = Field(
default_factory=lambda: datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z")
)
id_: UUID | str | None = Field(default=None, alias="id")
@field_serializer("timestamp")
@classmethod
def serialize_timestamp(cls, v: str) -> str:
return v
@field_validator("id_")
@classmethod
def validate_id(cls, v: UUID | str | None) -> str | None:
if isinstance(v, UUID):
return str(v)
return v
class MessageEvent(PlaygroundEvent):
category: Literal["message", "error", "warning", "info"] = "message"
format_type: Literal["default", "error", "warning", "info"] = Field(default="default")
session_id: str | None = Field(default=None)
error: bool = Field(default=False)
edit: bool = Field(default=False)
flow_id: UUID | str | None = Field(default=None)
sender: str = Field(default=MESSAGE_SENDER_USER)
sender_name: str = Field(default="User")
@field_validator("flow_id")
@classmethod
def validate_flow_id(cls, v: UUID | str | None) -> str | None:
if isinstance(v, UUID):
return str(v)
return v
class ErrorEvent(MessageEvent):
background_color: str = Field(default="#FF0000")
text_color: str = Field(default="#FFFFFF")
format_type: Literal["default", "error", "warning", "info"] = Field(default="error")
allow_markdown: bool = Field(default=False)
category: Literal["error"] = "error"
class WarningEvent(PlaygroundEvent):
background_color: str = Field(default="#FFA500")
text_color: str = Field(default="#000000")
format_type: Literal["default", "error", "warning", "info"] = Field(default="warning")
class InfoEvent(PlaygroundEvent):
background_color: str = Field(default="#0000FF")
text_color: str = Field(default="#FFFFFF")
format_type: Literal["default", "error", "warning", "info"] = Field(default="info")
class TokenEvent(BaseModel):
chunk: str = Field(...)
id: UUID | str | None = Field(alias="id")
timestamp: Annotated[str, timestamp_to_str_validator] = Field(
default_factory=lambda: datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z")
)
# Factory functions first
def create_message(
text: str,
category: Literal["message", "error", "warning", "info"] = "message",
properties: dict | None = None,
content_blocks: list[ContentBlock] | None = None,
sender_name: str | None = None,
files: list[str] | None = None,
timestamp: str | None = None,
format_type: Literal["default", "error", "warning", "info"] = "default",
sender: str | None = None,
session_id: str | None = None,
id: UUID | str | None = None, # noqa: A002
flow_id: UUID | str | None = None,
*,
error: bool = False,
edit: bool = False,
) -> MessageEvent:
return MessageEvent(
text=text,
properties=properties,
category=category,
content_blocks=content_blocks,
sender_name=sender_name,
files=files,
timestamp=timestamp,
format_type=format_type,
sender=sender,
id=id,
session_id=session_id,
error=error,
edit=edit,
flow_id=flow_id,
)
def create_error(
text: str,
properties: dict | None = None,
traceback: str | None = None,
title: str = "Error",
timestamp: str | None = None,
id: UUID | str | None = None, # noqa: A002
flow_id: UUID | str | None = None,
session_id: str | None = None,
content_blocks: list[ContentBlock] | None = None,
) -> ErrorEvent:
if traceback:
content_blocks = content_blocks or []
content_blocks += [ContentBlock(title=title, contents=[ErrorContent(type="error", traceback=traceback)])]
return ErrorEvent(
text=text,
properties=properties,
content_blocks=content_blocks,
timestamp=timestamp,
id=id,
flow_id=flow_id,
session_id=session_id,
)
def create_warning(message: str) -> WarningEvent:
return WarningEvent(text=message)
def create_info(message: str) -> InfoEvent:
return InfoEvent(text=message)
def create_token(chunk: str, id: str) -> TokenEvent: # noqa: A002
return TokenEvent(
chunk=chunk,
id=id,
)
_EVENT_CREATORS: dict[str, tuple[Callable, inspect.Signature]] = {
"message": (create_message, inspect.signature(create_message)),
"error": (create_error, inspect.signature(create_error)),
"warning": (create_warning, inspect.signature(create_warning)),
"info": (create_info, inspect.signature(create_info)),
"token": (create_token, inspect.signature(create_token)),
}
def create_event_by_type(
event_type: Literal["message", "error", "warning", "info", "token"], **kwargs
) -> PlaygroundEvent | dict:
if event_type not in _EVENT_CREATORS:
return kwargs
creator_func, signature = _EVENT_CREATORS[event_type]
valid_params = {k: v for k, v in kwargs.items() if k in signature.parameters}
return creator_func(**valid_params)