Spaces:
Runtime error
Runtime error
File size: 2,331 Bytes
f0fe0fd 9fd6e20 c6a2a56 bb7c9a3 c6a2a56 bb7c9a3 9fd6e20 c6a2a56 bb7c9a3 c6a2a56 bb7c9a3 c21d29c f0fe0fd c6a2a56 f0fe0fd c6a2a56 f0fe0fd 9fd6e20 8ec2c5a c6a2a56 f0fe0fd 9fd6e20 8ec2c5a 9fd6e20 bb7c9a3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
from asyncio import create_task, iscoroutinefunction, Lock, to_thread
from collections import defaultdict
from loguru import logger
from pydantic import ConfigDict, PrivateAttr
from typing import Any, Callable, List, MutableMapping, Self
from ctp_slack_bot.core import ApplicationComponentBase
from ctp_slack_bot.enums import EventType
class EventBrokerageService(ApplicationComponentBase):
"""
Service for brokering events between services.
"""
model_config = ConfigDict(frozen=True)
__write_lock: Lock = PrivateAttr(default_factory=Lock)
__subscribers: MutableMapping[EventType, tuple[Callable]] = PrivateAttr(default_factory=lambda: defaultdict(tuple))
async def subscribe(self: Self, type: EventType, callback: Callable) -> None:
"""Subscribe to an event type with a callback function."""
async with self.__write_lock:
subscribers = self.__subscribers[type]
self.__subscribers[type] = subscribers + (callback, )
logger.debug("One new subscriber was added for event type {} ({} subscriber(s) in total).", type, len(subscribers))
async def publish(self: Self, type: EventType, data: Any = None) -> None:
"""Publish an event with optional data to all subscribers."""
subscribers = self.__subscribers[type]
if not subscribers:
logger.debug("No subscribers handle event {}: {}", type, len(subscribers), data)
return
logger.debug("Broadcasting event {} to {} subscriber(s): {}", type, len(subscribers), data)
for callback in subscribers:
if iscoroutinefunction(callback):
task = create_task(callback(data))
task.add_done_callback(lambda done_task: logger.error("Error in asynchronous event callback handling event {}: {}", type, done_task.exception())
if done_task.exception()
else None)
else:
try:
create_task(to_thread(callback, data))
except Exception as e:
logger.error("Error scheduling synchronous callback to handle event {}: {}", type, e)
@property
def name(self: Self) -> str:
return "event_brokerage_service"
|