File size: 2,331 Bytes
f0fe0fd
9fd6e20
c6a2a56
bb7c9a3
 
c6a2a56
bb7c9a3
9fd6e20
c6a2a56
bb7c9a3
 
c6a2a56
 
 
 
bb7c9a3
c21d29c
f0fe0fd
 
c6a2a56
f0fe0fd
c6a2a56
f0fe0fd
 
 
 
9fd6e20
8ec2c5a
c6a2a56
f0fe0fd
9fd6e20
 
 
 
 
 
8ec2c5a
 
9fd6e20
 
 
 
 
 
 
bb7c9a3
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from asyncio import create_task, iscoroutinefunction, Lock, to_thread
from collections import defaultdict
from loguru import logger
from pydantic import ConfigDict, PrivateAttr
from typing import Any, Callable, List, MutableMapping, Self

from ctp_slack_bot.core import ApplicationComponentBase
from ctp_slack_bot.enums import EventType


class EventBrokerageService(ApplicationComponentBase):
    """
    Service for brokering events between services.
    """

    model_config = ConfigDict(frozen=True)

    __write_lock: Lock = PrivateAttr(default_factory=Lock)
    __subscribers: MutableMapping[EventType, tuple[Callable]] = PrivateAttr(default_factory=lambda: defaultdict(tuple))

    async def subscribe(self: Self, type: EventType, callback: Callable) -> None:
        """Subscribe to an event type with a callback function."""
        async with self.__write_lock:
            subscribers = self.__subscribers[type]
            self.__subscribers[type] = subscribers + (callback, )
            logger.debug("One new subscriber was added for event type {} ({} subscriber(s) in total).", type, len(subscribers))

    async def publish(self: Self, type: EventType, data: Any = None) -> None:
        """Publish an event with optional data to all subscribers."""
        subscribers = self.__subscribers[type]
        if not subscribers:
            logger.debug("No subscribers handle event {}: {}", type, len(subscribers), data)
            return
        logger.debug("Broadcasting event {} to {} subscriber(s): {}", type, len(subscribers), data)
        for callback in subscribers:
            if iscoroutinefunction(callback):
                task = create_task(callback(data))
                task.add_done_callback(lambda done_task: logger.error("Error in asynchronous event callback handling event {}: {}", type, done_task.exception())
                                                         if done_task.exception()
                                                         else None)
            else:
                try:
                    create_task(to_thread(callback, data))
                except Exception as e:
                    logger.error("Error scheduling synchronous callback to handle event {}: {}", type, e)

    @property
    def name(self: Self) -> str:
        return "event_brokerage_service"