Spaces:
Running
Running
from __future__ import annotations | |
import collections.abc as c | |
import sys | |
import typing as t | |
import weakref | |
from collections import defaultdict | |
from contextlib import contextmanager | |
from functools import cached_property | |
from inspect import iscoroutinefunction | |
from ._utilities import make_id | |
from ._utilities import make_ref | |
from ._utilities import Symbol | |
F = t.TypeVar("F", bound=c.Callable[..., t.Any]) | |
ANY = Symbol("ANY") | |
"""Symbol for "any sender".""" | |
ANY_ID = 0 | |
class Signal: | |
"""A notification emitter. | |
:param doc: The docstring for the signal. | |
""" | |
ANY = ANY | |
"""An alias for the :data:`~blinker.ANY` sender symbol.""" | |
set_class: type[set[t.Any]] = set | |
"""The set class to use for tracking connected receivers and senders. | |
Python's ``set`` is unordered. If receivers must be dispatched in the order | |
they were connected, an ordered set implementation can be used. | |
.. versionadded:: 1.7 | |
""" | |
def receiver_connected(self) -> Signal: | |
"""Emitted at the end of each :meth:`connect` call. | |
The signal sender is the signal instance, and the :meth:`connect` | |
arguments are passed through: ``receiver``, ``sender``, and ``weak``. | |
.. versionadded:: 1.2 | |
""" | |
return Signal(doc="Emitted after a receiver connects.") | |
def receiver_disconnected(self) -> Signal: | |
"""Emitted at the end of each :meth:`disconnect` call. | |
The sender is the signal instance, and the :meth:`disconnect` arguments | |
are passed through: ``receiver`` and ``sender``. | |
This signal is emitted **only** when :meth:`disconnect` is called | |
explicitly. This signal cannot be emitted by an automatic disconnect | |
when a weakly referenced receiver or sender goes out of scope, as the | |
instance is no longer be available to be used as the sender for this | |
signal. | |
An alternative approach is available by subscribing to | |
:attr:`receiver_connected` and setting up a custom weakref cleanup | |
callback on weak receivers and senders. | |
.. versionadded:: 1.2 | |
""" | |
return Signal(doc="Emitted after a receiver disconnects.") | |
def __init__(self, doc: str | None = None) -> None: | |
if doc: | |
self.__doc__ = doc | |
self.receivers: dict[ | |
t.Any, weakref.ref[c.Callable[..., t.Any]] | c.Callable[..., t.Any] | |
] = {} | |
"""The map of connected receivers. Useful to quickly check if any | |
receivers are connected to the signal: ``if s.receivers:``. The | |
structure and data is not part of the public API, but checking its | |
boolean value is. | |
""" | |
self.is_muted: bool = False | |
self._by_receiver: dict[t.Any, set[t.Any]] = defaultdict(self.set_class) | |
self._by_sender: dict[t.Any, set[t.Any]] = defaultdict(self.set_class) | |
self._weak_senders: dict[t.Any, weakref.ref[t.Any]] = {} | |
def connect(self, receiver: F, sender: t.Any = ANY, weak: bool = True) -> F: | |
"""Connect ``receiver`` to be called when the signal is sent by | |
``sender``. | |
:param receiver: The callable to call when :meth:`send` is called with | |
the given ``sender``, passing ``sender`` as a positional argument | |
along with any extra keyword arguments. | |
:param sender: Any object or :data:`ANY`. ``receiver`` will only be | |
called when :meth:`send` is called with this sender. If ``ANY``, the | |
receiver will be called for any sender. A receiver may be connected | |
to multiple senders by calling :meth:`connect` multiple times. | |
:param weak: Track the receiver with a :mod:`weakref`. The receiver will | |
be automatically disconnected when it is garbage collected. When | |
connecting a receiver defined within a function, set to ``False``, | |
otherwise it will be disconnected when the function scope ends. | |
""" | |
receiver_id = make_id(receiver) | |
sender_id = ANY_ID if sender is ANY else make_id(sender) | |
if weak: | |
self.receivers[receiver_id] = make_ref( | |
receiver, self._make_cleanup_receiver(receiver_id) | |
) | |
else: | |
self.receivers[receiver_id] = receiver | |
self._by_sender[sender_id].add(receiver_id) | |
self._by_receiver[receiver_id].add(sender_id) | |
if sender is not ANY and sender_id not in self._weak_senders: | |
# store a cleanup for weakref-able senders | |
try: | |
self._weak_senders[sender_id] = make_ref( | |
sender, self._make_cleanup_sender(sender_id) | |
) | |
except TypeError: | |
pass | |
if "receiver_connected" in self.__dict__ and self.receiver_connected.receivers: | |
try: | |
self.receiver_connected.send( | |
self, receiver=receiver, sender=sender, weak=weak | |
) | |
except TypeError: | |
# TODO no explanation or test for this | |
self.disconnect(receiver, sender) | |
raise | |
return receiver | |
def connect_via(self, sender: t.Any, weak: bool = False) -> c.Callable[[F], F]: | |
"""Connect the decorated function to be called when the signal is sent | |
by ``sender``. | |
The decorated function will be called when :meth:`send` is called with | |
the given ``sender``, passing ``sender`` as a positional argument along | |
with any extra keyword arguments. | |
:param sender: Any object or :data:`ANY`. ``receiver`` will only be | |
called when :meth:`send` is called with this sender. If ``ANY``, the | |
receiver will be called for any sender. A receiver may be connected | |
to multiple senders by calling :meth:`connect` multiple times. | |
:param weak: Track the receiver with a :mod:`weakref`. The receiver will | |
be automatically disconnected when it is garbage collected. When | |
connecting a receiver defined within a function, set to ``False``, | |
otherwise it will be disconnected when the function scope ends.= | |
.. versionadded:: 1.1 | |
""" | |
def decorator(fn: F) -> F: | |
self.connect(fn, sender, weak) | |
return fn | |
return decorator | |
def connected_to( | |
self, receiver: c.Callable[..., t.Any], sender: t.Any = ANY | |
) -> c.Generator[None, None, None]: | |
"""A context manager that temporarily connects ``receiver`` to the | |
signal while a ``with`` block executes. When the block exits, the | |
receiver is disconnected. Useful for tests. | |
:param receiver: The callable to call when :meth:`send` is called with | |
the given ``sender``, passing ``sender`` as a positional argument | |
along with any extra keyword arguments. | |
:param sender: Any object or :data:`ANY`. ``receiver`` will only be | |
called when :meth:`send` is called with this sender. If ``ANY``, the | |
receiver will be called for any sender. | |
.. versionadded:: 1.1 | |
""" | |
self.connect(receiver, sender=sender, weak=False) | |
try: | |
yield None | |
finally: | |
self.disconnect(receiver) | |
def muted(self) -> c.Generator[None, None, None]: | |
"""A context manager that temporarily disables the signal. No receivers | |
will be called if the signal is sent, until the ``with`` block exits. | |
Useful for tests. | |
""" | |
self.is_muted = True | |
try: | |
yield None | |
finally: | |
self.is_muted = False | |
def send( | |
self, | |
sender: t.Any | None = None, | |
/, | |
*, | |
_async_wrapper: c.Callable[ | |
[c.Callable[..., c.Coroutine[t.Any, t.Any, t.Any]]], c.Callable[..., t.Any] | |
] | |
| None = None, | |
**kwargs: t.Any, | |
) -> list[tuple[c.Callable[..., t.Any], t.Any]]: | |
"""Call all receivers that are connected to the given ``sender`` | |
or :data:`ANY`. Each receiver is called with ``sender`` as a positional | |
argument along with any extra keyword arguments. Return a list of | |
``(receiver, return value)`` tuples. | |
The order receivers are called is undefined, but can be influenced by | |
setting :attr:`set_class`. | |
If a receiver raises an exception, that exception will propagate up. | |
This makes debugging straightforward, with an assumption that correctly | |
implemented receivers will not raise. | |
:param sender: Call receivers connected to this sender, in addition to | |
those connected to :data:`ANY`. | |
:param _async_wrapper: Will be called on any receivers that are async | |
coroutines to turn them into sync callables. For example, could run | |
the receiver with an event loop. | |
:param kwargs: Extra keyword arguments to pass to each receiver. | |
.. versionchanged:: 1.7 | |
Added the ``_async_wrapper`` argument. | |
""" | |
if self.is_muted: | |
return [] | |
results = [] | |
for receiver in self.receivers_for(sender): | |
if iscoroutinefunction(receiver): | |
if _async_wrapper is None: | |
raise RuntimeError("Cannot send to a coroutine function.") | |
result = _async_wrapper(receiver)(sender, **kwargs) | |
else: | |
result = receiver(sender, **kwargs) | |
results.append((receiver, result)) | |
return results | |
async def send_async( | |
self, | |
sender: t.Any | None = None, | |
/, | |
*, | |
_sync_wrapper: c.Callable[ | |
[c.Callable[..., t.Any]], c.Callable[..., c.Coroutine[t.Any, t.Any, t.Any]] | |
] | |
| None = None, | |
**kwargs: t.Any, | |
) -> list[tuple[c.Callable[..., t.Any], t.Any]]: | |
"""Await all receivers that are connected to the given ``sender`` | |
or :data:`ANY`. Each receiver is called with ``sender`` as a positional | |
argument along with any extra keyword arguments. Return a list of | |
``(receiver, return value)`` tuples. | |
The order receivers are called is undefined, but can be influenced by | |
setting :attr:`set_class`. | |
If a receiver raises an exception, that exception will propagate up. | |
This makes debugging straightforward, with an assumption that correctly | |
implemented receivers will not raise. | |
:param sender: Call receivers connected to this sender, in addition to | |
those connected to :data:`ANY`. | |
:param _sync_wrapper: Will be called on any receivers that are sync | |
callables to turn them into async coroutines. For example, | |
could call the receiver in a thread. | |
:param kwargs: Extra keyword arguments to pass to each receiver. | |
.. versionadded:: 1.7 | |
""" | |
if self.is_muted: | |
return [] | |
results = [] | |
for receiver in self.receivers_for(sender): | |
if not iscoroutinefunction(receiver): | |
if _sync_wrapper is None: | |
raise RuntimeError("Cannot send to a non-coroutine function.") | |
result = await _sync_wrapper(receiver)(sender, **kwargs) | |
else: | |
result = await receiver(sender, **kwargs) | |
results.append((receiver, result)) | |
return results | |
def has_receivers_for(self, sender: t.Any) -> bool: | |
"""Check if there is at least one receiver that will be called with the | |
given ``sender``. A receiver connected to :data:`ANY` will always be | |
called, regardless of sender. Does not check if weakly referenced | |
receivers are still live. See :meth:`receivers_for` for a stronger | |
search. | |
:param sender: Check for receivers connected to this sender, in addition | |
to those connected to :data:`ANY`. | |
""" | |
if not self.receivers: | |
return False | |
if self._by_sender[ANY_ID]: | |
return True | |
if sender is ANY: | |
return False | |
return make_id(sender) in self._by_sender | |
def receivers_for( | |
self, sender: t.Any | |
) -> c.Generator[c.Callable[..., t.Any], None, None]: | |
"""Yield each receiver to be called for ``sender``, in addition to those | |
to be called for :data:`ANY`. Weakly referenced receivers that are not | |
live will be disconnected and skipped. | |
:param sender: Yield receivers connected to this sender, in addition | |
to those connected to :data:`ANY`. | |
""" | |
# TODO: test receivers_for(ANY) | |
if not self.receivers: | |
return | |
sender_id = make_id(sender) | |
if sender_id in self._by_sender: | |
ids = self._by_sender[ANY_ID] | self._by_sender[sender_id] | |
else: | |
ids = self._by_sender[ANY_ID].copy() | |
for receiver_id in ids: | |
receiver = self.receivers.get(receiver_id) | |
if receiver is None: | |
continue | |
if isinstance(receiver, weakref.ref): | |
strong = receiver() | |
if strong is None: | |
self._disconnect(receiver_id, ANY_ID) | |
continue | |
yield strong | |
else: | |
yield receiver | |
def disconnect(self, receiver: c.Callable[..., t.Any], sender: t.Any = ANY) -> None: | |
"""Disconnect ``receiver`` from being called when the signal is sent by | |
``sender``. | |
:param receiver: A connected receiver callable. | |
:param sender: Disconnect from only this sender. By default, disconnect | |
from all senders. | |
""" | |
sender_id: c.Hashable | |
if sender is ANY: | |
sender_id = ANY_ID | |
else: | |
sender_id = make_id(sender) | |
receiver_id = make_id(receiver) | |
self._disconnect(receiver_id, sender_id) | |
if ( | |
"receiver_disconnected" in self.__dict__ | |
and self.receiver_disconnected.receivers | |
): | |
self.receiver_disconnected.send(self, receiver=receiver, sender=sender) | |
def _disconnect(self, receiver_id: c.Hashable, sender_id: c.Hashable) -> None: | |
if sender_id == ANY_ID: | |
if self._by_receiver.pop(receiver_id, None) is not None: | |
for bucket in self._by_sender.values(): | |
bucket.discard(receiver_id) | |
self.receivers.pop(receiver_id, None) | |
else: | |
self._by_sender[sender_id].discard(receiver_id) | |
self._by_receiver[receiver_id].discard(sender_id) | |
def _make_cleanup_receiver( | |
self, receiver_id: c.Hashable | |
) -> c.Callable[[weakref.ref[c.Callable[..., t.Any]]], None]: | |
"""Create a callback function to disconnect a weakly referenced | |
receiver when it is garbage collected. | |
""" | |
def cleanup(ref: weakref.ref[c.Callable[..., t.Any]]) -> None: | |
# If the interpreter is shutting down, disconnecting can result in a | |
# weird ignored exception. Don't call it in that case. | |
if not sys.is_finalizing(): | |
self._disconnect(receiver_id, ANY_ID) | |
return cleanup | |
def _make_cleanup_sender( | |
self, sender_id: c.Hashable | |
) -> c.Callable[[weakref.ref[t.Any]], None]: | |
"""Create a callback function to disconnect all receivers for a weakly | |
referenced sender when it is garbage collected. | |
""" | |
assert sender_id != ANY_ID | |
def cleanup(ref: weakref.ref[t.Any]) -> None: | |
self._weak_senders.pop(sender_id, None) | |
for receiver_id in self._by_sender.pop(sender_id, ()): | |
self._by_receiver[receiver_id].discard(sender_id) | |
return cleanup | |
def _cleanup_bookkeeping(self) -> None: | |
"""Prune unused sender/receiver bookkeeping. Not threadsafe. | |
Connecting & disconnecting leaves behind a small amount of bookkeeping | |
data. Typical workloads using Blinker, for example in most web apps, | |
Flask, CLI scripts, etc., are not adversely affected by this | |
bookkeeping. | |
With a long-running process performing dynamic signal routing with high | |
volume, e.g. connecting to function closures, senders are all unique | |
object instances. Doing all of this over and over may cause memory usage | |
to grow due to extraneous bookkeeping. (An empty ``set`` for each stale | |
sender/receiver pair.) | |
This method will prune that bookkeeping away, with the caveat that such | |
pruning is not threadsafe. The risk is that cleanup of a fully | |
disconnected receiver/sender pair occurs while another thread is | |
connecting that same pair. If you are in the highly dynamic, unique | |
receiver/sender situation that has lead you to this method, that failure | |
mode is perhaps not a big deal for you. | |
""" | |
for mapping in (self._by_sender, self._by_receiver): | |
for ident, bucket in list(mapping.items()): | |
if not bucket: | |
mapping.pop(ident, None) | |
def _clear_state(self) -> None: | |
"""Disconnect all receivers and senders. Useful for tests.""" | |
self._weak_senders.clear() | |
self.receivers.clear() | |
self._by_sender.clear() | |
self._by_receiver.clear() | |
class NamedSignal(Signal): | |
"""A named generic notification emitter. The name is not used by the signal | |
itself, but matches the key in the :class:`Namespace` that it belongs to. | |
:param name: The name of the signal within the namespace. | |
:param doc: The docstring for the signal. | |
""" | |
def __init__(self, name: str, doc: str | None = None) -> None: | |
super().__init__(doc) | |
#: The name of this signal. | |
self.name: str = name | |
def __repr__(self) -> str: | |
base = super().__repr__() | |
return f"{base[:-1]}; {self.name!r}>" # noqa: E702 | |
class Namespace(dict[str, NamedSignal]): | |
"""A dict mapping names to signals.""" | |
def signal(self, name: str, doc: str | None = None) -> NamedSignal: | |
"""Return the :class:`NamedSignal` for the given ``name``, creating it | |
if required. Repeated calls with the same name return the same signal. | |
:param name: The name of the signal. | |
:param doc: The docstring of the signal. | |
""" | |
if name not in self: | |
self[name] = NamedSignal(name, doc) | |
return self[name] | |
class _PNamespaceSignal(t.Protocol): | |
def __call__(self, name: str, doc: str | None = None) -> NamedSignal: ... | |
default_namespace: Namespace = Namespace() | |
"""A default :class:`Namespace` for creating named signals. :func:`signal` | |
creates a :class:`NamedSignal` in this namespace. | |
""" | |
signal: _PNamespaceSignal = default_namespace.signal | |
"""Return a :class:`NamedSignal` in :data:`default_namespace` with the given | |
``name``, creating it if required. Repeated calls with the same name return the | |
same signal. | |
""" | |