Spaces:
Running
Running
import inspect | |
from collections.abc import Callable | |
from datetime import datetime, timezone | |
from typing import Annotated, Literal | |
from uuid import UUID | |
from pydantic import BaseModel, ConfigDict, Field, field_serializer, field_validator | |
from langflow.schema.content_block import ContentBlock | |
from langflow.schema.content_types import ErrorContent | |
from langflow.schema.properties import Properties | |
from langflow.schema.validators import timestamp_to_str_validator | |
from langflow.utils.constants import MESSAGE_SENDER_USER | |
class PlaygroundEvent(BaseModel): | |
model_config = ConfigDict(extra="allow", populate_by_name=True) | |
properties: Properties | None = Field(default=None) | |
sender_name: str | None = Field(default=None) | |
content_blocks: list[ContentBlock] | None = Field(default=None) | |
format_type: Literal["default", "error", "warning", "info"] = Field(default="default") | |
files: list[str] | None = Field(default=None) | |
text: str | None = Field(default=None) | |
timestamp: Annotated[str, timestamp_to_str_validator] = Field( | |
default_factory=lambda: datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z") | |
) | |
id_: UUID | str | None = Field(default=None, alias="id") | |
def serialize_timestamp(cls, v: str) -> str: | |
return v | |
def validate_id(cls, v: UUID | str | None) -> str | None: | |
if isinstance(v, UUID): | |
return str(v) | |
return v | |
class MessageEvent(PlaygroundEvent): | |
category: Literal["message", "error", "warning", "info"] = "message" | |
format_type: Literal["default", "error", "warning", "info"] = Field(default="default") | |
session_id: str | None = Field(default=None) | |
error: bool = Field(default=False) | |
edit: bool = Field(default=False) | |
flow_id: UUID | str | None = Field(default=None) | |
sender: str = Field(default=MESSAGE_SENDER_USER) | |
sender_name: str = Field(default="User") | |
def validate_flow_id(cls, v: UUID | str | None) -> str | None: | |
if isinstance(v, UUID): | |
return str(v) | |
return v | |
class ErrorEvent(MessageEvent): | |
background_color: str = Field(default="#FF0000") | |
text_color: str = Field(default="#FFFFFF") | |
format_type: Literal["default", "error", "warning", "info"] = Field(default="error") | |
allow_markdown: bool = Field(default=False) | |
category: Literal["error"] = "error" | |
class WarningEvent(PlaygroundEvent): | |
background_color: str = Field(default="#FFA500") | |
text_color: str = Field(default="#000000") | |
format_type: Literal["default", "error", "warning", "info"] = Field(default="warning") | |
class InfoEvent(PlaygroundEvent): | |
background_color: str = Field(default="#0000FF") | |
text_color: str = Field(default="#FFFFFF") | |
format_type: Literal["default", "error", "warning", "info"] = Field(default="info") | |
class TokenEvent(BaseModel): | |
chunk: str = Field(...) | |
id: UUID | str | None = Field(alias="id") | |
timestamp: Annotated[str, timestamp_to_str_validator] = Field( | |
default_factory=lambda: datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z") | |
) | |
# Factory functions first | |
def create_message( | |
text: str, | |
category: Literal["message", "error", "warning", "info"] = "message", | |
properties: dict | None = None, | |
content_blocks: list[ContentBlock] | None = None, | |
sender_name: str | None = None, | |
files: list[str] | None = None, | |
timestamp: str | None = None, | |
format_type: Literal["default", "error", "warning", "info"] = "default", | |
sender: str | None = None, | |
session_id: str | None = None, | |
id: UUID | str | None = None, # noqa: A002 | |
flow_id: UUID | str | None = None, | |
*, | |
error: bool = False, | |
edit: bool = False, | |
) -> MessageEvent: | |
return MessageEvent( | |
text=text, | |
properties=properties, | |
category=category, | |
content_blocks=content_blocks, | |
sender_name=sender_name, | |
files=files, | |
timestamp=timestamp, | |
format_type=format_type, | |
sender=sender, | |
id=id, | |
session_id=session_id, | |
error=error, | |
edit=edit, | |
flow_id=flow_id, | |
) | |
def create_error( | |
text: str, | |
properties: dict | None = None, | |
traceback: str | None = None, | |
title: str = "Error", | |
timestamp: str | None = None, | |
id: UUID | str | None = None, # noqa: A002 | |
flow_id: UUID | str | None = None, | |
session_id: str | None = None, | |
content_blocks: list[ContentBlock] | None = None, | |
) -> ErrorEvent: | |
if traceback: | |
content_blocks = content_blocks or [] | |
content_blocks += [ContentBlock(title=title, contents=[ErrorContent(type="error", traceback=traceback)])] | |
return ErrorEvent( | |
text=text, | |
properties=properties, | |
content_blocks=content_blocks, | |
timestamp=timestamp, | |
id=id, | |
flow_id=flow_id, | |
session_id=session_id, | |
) | |
def create_warning(message: str) -> WarningEvent: | |
return WarningEvent(text=message) | |
def create_info(message: str) -> InfoEvent: | |
return InfoEvent(text=message) | |
def create_token(chunk: str, id: str) -> TokenEvent: # noqa: A002 | |
return TokenEvent( | |
chunk=chunk, | |
id=id, | |
) | |
_EVENT_CREATORS: dict[str, tuple[Callable, inspect.Signature]] = { | |
"message": (create_message, inspect.signature(create_message)), | |
"error": (create_error, inspect.signature(create_error)), | |
"warning": (create_warning, inspect.signature(create_warning)), | |
"info": (create_info, inspect.signature(create_info)), | |
"token": (create_token, inspect.signature(create_token)), | |
} | |
def create_event_by_type( | |
event_type: Literal["message", "error", "warning", "info", "token"], **kwargs | |
) -> PlaygroundEvent | dict: | |
if event_type not in _EVENT_CREATORS: | |
return kwargs | |
creator_func, signature = _EVENT_CREATORS[event_type] | |
valid_params = {k: v for k, v in kwargs.items() if k in signature.parameters} | |
return creator_func(**valid_params) | |