ctp-slack-bot / src /ctp_slack_bot /services /event_brokerage_service.py
LiKenun's picture
Refactor #6
f0fe0fd
from asyncio import create_task, iscoroutinefunction, Lock, to_thread
from collections import defaultdict
from loguru import logger
from pydantic import ConfigDict, PrivateAttr
from typing import Any, Callable, List, MutableMapping, Self
from ctp_slack_bot.core import ApplicationComponentBase
from ctp_slack_bot.enums import EventType
class EventBrokerageService(ApplicationComponentBase):
"""
Service for brokering events between services.
"""
model_config = ConfigDict(frozen=True)
__write_lock: Lock = PrivateAttr(default_factory=Lock)
__subscribers: MutableMapping[EventType, tuple[Callable]] = PrivateAttr(default_factory=lambda: defaultdict(tuple))
async def subscribe(self: Self, type: EventType, callback: Callable) -> None:
"""Subscribe to an event type with a callback function."""
async with self.__write_lock:
subscribers = self.__subscribers[type]
self.__subscribers[type] = subscribers + (callback, )
logger.debug("One new subscriber was added for event type {} ({} subscriber(s) in total).", type, len(subscribers))
async def publish(self: Self, type: EventType, data: Any = None) -> None:
"""Publish an event with optional data to all subscribers."""
subscribers = self.__subscribers[type]
if not subscribers:
logger.debug("No subscribers handle event {}: {}", type, len(subscribers), data)
return
logger.debug("Broadcasting event {} to {} subscriber(s): {}", type, len(subscribers), data)
for callback in subscribers:
if iscoroutinefunction(callback):
task = create_task(callback(data))
task.add_done_callback(lambda done_task: logger.error("Error in asynchronous event callback handling event {}: {}", type, done_task.exception())
if done_task.exception()
else None)
else:
try:
create_task(to_thread(callback, data))
except Exception as e:
logger.error("Error scheduling synchronous callback to handle event {}: {}", type, e)
@property
def name(self: Self) -> str:
return "event_brokerage_service"