import asyncio import inspect import json import time import uuid from functools import partial from typing import Literal from fastapi.encoders import jsonable_encoder from loguru import logger from typing_extensions import Protocol from langflow.schema.log import LoggableType from langflow.schema.playground_events import create_event_by_type class EventCallback(Protocol): def __call__(self, *, manager: "EventManager", event_type: str, data: LoggableType): ... class PartialEventCallback(Protocol): def __call__(self, *, data: LoggableType): ... class EventManager: def __init__(self, queue: asyncio.Queue): self.queue = queue self.events: dict[str, PartialEventCallback] = {} @staticmethod def _validate_callback(callback: EventCallback) -> None: if not callable(callback): msg = "Callback must be callable" raise TypeError(msg) # Check if it has `self, event_type and data` sig = inspect.signature(callback) parameters = ["manager", "event_type", "data"] if len(sig.parameters) != len(parameters): msg = "Callback must have exactly 3 parameters" raise ValueError(msg) if not all(param.name in parameters for param in sig.parameters.values()): msg = "Callback must have exactly 3 parameters: manager, event_type, and data" raise ValueError(msg) def register_event( self, name: str, event_type: Literal["message", "error", "warning", "info", "token"], callback: EventCallback | None = None, ) -> None: if not name: msg = "Event name cannot be empty" raise ValueError(msg) if not name.startswith("on_"): msg = "Event name must start with 'on_'" raise ValueError(msg) if callback is None: callback_ = partial(self.send_event, event_type=event_type) else: callback_ = partial(callback, manager=self, event_type=event_type) self.events[name] = callback_ def send_event(self, *, event_type: Literal["message", "error", "warning", "info", "token"], data: LoggableType): try: if isinstance(data, dict) and event_type in {"message", "error", "warning", "info", "token"}: data = create_event_by_type(event_type, **data) except TypeError as e: logger.debug(f"Error creating playground event: {e}") except Exception: raise jsonable_data = jsonable_encoder(data) json_data = {"event": event_type, "data": jsonable_data} event_id = f"{event_type}-{uuid.uuid4()}" str_data = json.dumps(json_data) + "\n\n" self.queue.put_nowait((event_id, str_data.encode("utf-8"), time.time())) def noop(self, *, data: LoggableType) -> None: pass def __getattr__(self, name: str) -> PartialEventCallback: return self.events.get(name, self.noop) def create_default_event_manager(queue): manager = EventManager(queue) manager.register_event("on_token", "token") manager.register_event("on_vertices_sorted", "vertices_sorted") manager.register_event("on_error", "error") manager.register_event("on_end", "end") manager.register_event("on_message", "add_message") manager.register_event("on_remove_message", "remove_message") manager.register_event("on_end_vertex", "end_vertex") manager.register_event("on_build_start", "build_start") manager.register_event("on_build_end", "build_end") return manager