from __future__ import annotations import logging import sys import tornado logging_names: dict[str | int, int | str] = {} logging_names.update(logging._levelToName) # type: ignore logging_names.update(logging._nameToLevel) # type: ignore LINUX = sys.platform == "linux" MACOS = sys.platform == "darwin" WINDOWS = sys.platform == "win32" if sys.version_info >= (3, 9): from asyncio import to_thread else: import contextvars import functools from asyncio import events async def to_thread(func, /, *args, **kwargs): """Asynchronously run function *func* in a separate thread. Any *args and **kwargs supplied for this function are directly passed to *func*. Also, the current :class:`contextvars.Context` is propagated, allowing context variables from the main thread to be accessed in the separate thread. Return a coroutine that can be awaited to get the eventual result of *func*. backport from https://github.com/python/cpython/blob/3f1ea163ea54513e00e0e9d5442fee1b639825cc/Lib/asyncio/threads.py#L12-L25 """ loop = events.get_running_loop() ctx = contextvars.copy_context() func_call = functools.partial(ctx.run, func, *args, **kwargs) return await loop.run_in_executor(None, func_call) if sys.version_info >= (3, 9): from random import randbytes else: try: import numpy def randbytes(size): return numpy.random.randint(255, size=size, dtype="u8").tobytes() except ImportError: import secrets def randbytes(size): return secrets.token_bytes(size) if tornado.version_info >= (6, 2, 0, 0): from tornado.ioloop import PeriodicCallback else: # Backport from https://github.com/tornadoweb/tornado/blob/a4f08a31a348445094d1efa17880ed5472db9f7d/tornado/ioloop.py#L838-L962 # License https://github.com/tornadoweb/tornado/blob/v6.2.0/LICENSE # Includes minor modifications to source code to pass linting # This backport ensures that async callbacks are not overlapping if a run # takes longer than the interval import datetime import math import random from inspect import isawaitable from typing import Awaitable, Callable from tornado.ioloop import IOLoop from tornado.log import app_log class PeriodicCallback: # type: ignore[no-redef] """Schedules the given callback to be called periodically. The callback is called every ``callback_time`` milliseconds when ``callback_time`` is a float. Note that the timeout is given in milliseconds, while most other time-related functions in Tornado use seconds. ``callback_time`` may alternatively be given as a `datetime.timedelta` object. If ``jitter`` is specified, each callback time will be randomly selected within a window of ``jitter * callback_time`` milliseconds. Jitter can be used to reduce alignment of events with similar periods. A jitter of 0.1 means allowing a 10% variation in callback time. The window is centered on ``callback_time`` so the total number of calls within a given interval should not be significantly affected by adding jitter. If the callback runs for longer than ``callback_time`` milliseconds, subsequent invocations will be skipped to get back on schedule. `start` must be called after the `PeriodicCallback` is created. .. versionchanged:: 5.0 The ``io_loop`` argument (deprecated since version 4.1) has been removed. .. versionchanged:: 5.1 The ``jitter`` argument is added. .. versionchanged:: 6.2 If the ``callback`` argument is a coroutine, and a callback runs for longer than ``callback_time``, subsequent invocations will be skipped. Previously this was only true for regular functions, not coroutines, which were "fire-and-forget" for `PeriodicCallback`. The ``callback_time`` argument now accepts `datetime.timedelta` objects, in addition to the previous numeric milliseconds. """ def __init__( self, callback: Callable[[], Awaitable | None], callback_time: datetime.timedelta | float, jitter: float = 0, ) -> None: self.callback = callback if isinstance(callback_time, datetime.timedelta): self.callback_time = callback_time / datetime.timedelta(milliseconds=1) else: if callback_time <= 0: raise ValueError( "Periodic callback must have a positive callback_time" ) self.callback_time = callback_time self.jitter = jitter self._running = False self._timeout = None # type: object def start(self) -> None: """Starts the timer.""" # Looking up the IOLoop here allows to first instantiate the # PeriodicCallback in another thread, then start it using # IOLoop.add_callback(). self.io_loop = IOLoop.current() self._running = True self._next_timeout = self.io_loop.time() self._schedule_next() def stop(self) -> None: """Stops the timer.""" self._running = False if self._timeout is not None: self.io_loop.remove_timeout(self._timeout) self._timeout = None def is_running(self) -> bool: """Returns ``True`` if this `.PeriodicCallback` has been started. .. versionadded:: 4.1 """ return self._running async def _run(self) -> None: if not self._running: return try: val = self.callback() if val is not None and isawaitable(val): await val except Exception: app_log.error("Exception in callback %r", self.callback, exc_info=True) finally: self._schedule_next() def _schedule_next(self) -> None: if self._running: self._update_next(self.io_loop.time()) self._timeout = self.io_loop.add_timeout(self._next_timeout, self._run) def _update_next(self, current_time: float) -> None: callback_time_sec = self.callback_time / 1000.0 if self.jitter: # apply jitter fraction callback_time_sec *= 1 + (self.jitter * (random.random() - 0.5)) if self._next_timeout <= current_time: # The period should be measured from the start of one call # to the start of the next. If one call takes too long, # skip cycles to get back to a multiple of the original # schedule. self._next_timeout += ( math.floor((current_time - self._next_timeout) / callback_time_sec) + 1 ) * callback_time_sec else: # If the clock moved backwards, ensure we advance the next # timeout instead of recomputing the same value again. # This may result in long gaps between callbacks if the # clock jumps backwards by a lot, but the far more common # scenario is a small NTP adjustment that should just be # ignored. # # Note that on some systems if time.time() runs slower # than time.monotonic() (most common on windows), we # effectively experience a small backwards time jump on # every iteration because PeriodicCallback uses # time.time() while asyncio schedules callbacks using # time.monotonic(). # https://github.com/tornadoweb/tornado/issues/2333 self._next_timeout += callback_time_sec