Spaces:
Runtime error
Runtime error
from asyncio import create_task, iscoroutinefunction, Lock, to_thread | |
from collections import defaultdict | |
from loguru import logger | |
from pydantic import ConfigDict, PrivateAttr | |
from typing import Any, Callable, List, MutableMapping, Self | |
from ctp_slack_bot.core import ApplicationComponentBase | |
from ctp_slack_bot.enums import EventType | |
class EventBrokerageService(ApplicationComponentBase): | |
""" | |
Service for brokering events between services. | |
""" | |
model_config = ConfigDict(frozen=True) | |
__write_lock: Lock = PrivateAttr(default_factory=Lock) | |
__subscribers: MutableMapping[EventType, tuple[Callable]] = PrivateAttr(default_factory=lambda: defaultdict(tuple)) | |
async def subscribe(self: Self, type: EventType, callback: Callable) -> None: | |
"""Subscribe to an event type with a callback function.""" | |
async with self.__write_lock: | |
subscribers = self.__subscribers[type] | |
self.__subscribers[type] = subscribers + (callback, ) | |
logger.debug("One new subscriber was added for event type {} ({} subscriber(s) in total).", type, len(subscribers)) | |
async def publish(self: Self, type: EventType, data: Any = None) -> None: | |
"""Publish an event with optional data to all subscribers.""" | |
subscribers = self.__subscribers[type] | |
if not subscribers: | |
logger.debug("No subscribers handle event {}: {}", type, len(subscribers), data) | |
return | |
logger.debug("Broadcasting event {} to {} subscriber(s): {}", type, len(subscribers), data) | |
for callback in subscribers: | |
if iscoroutinefunction(callback): | |
task = create_task(callback(data)) | |
task.add_done_callback(lambda done_task: logger.error("Error in asynchronous event callback handling event {}: {}", type, done_task.exception()) | |
if done_task.exception() | |
else None) | |
else: | |
try: | |
create_task(to_thread(callback, data)) | |
except Exception as e: | |
logger.error("Error scheduling synchronous callback to handle event {}: {}", type, e) | |
def name(self: Self) -> str: | |
return "event_brokerage_service" | |