Spaces:
				
			
			
	
			
			
		Running
		
			on 
			
			Zero
	
	
	
			
			
	
	
	
	
		
		
		Running
		
			on 
			
			Zero
	| from __future__ import annotations | |
| import logging | |
| import sys | |
| import tornado | |
| logging_names: dict[str | int, int | str] = {} | |
| logging_names.update(logging._levelToName) # type: ignore | |
| logging_names.update(logging._nameToLevel) # type: ignore | |
| LINUX = sys.platform == "linux" | |
| MACOS = sys.platform == "darwin" | |
| WINDOWS = sys.platform == "win32" | |
| if sys.version_info >= (3, 9): | |
| from asyncio import to_thread | |
| else: | |
| import contextvars | |
| import functools | |
| from asyncio import events | |
| async def to_thread(func, /, *args, **kwargs): | |
| """Asynchronously run function *func* in a separate thread. | |
| Any *args and **kwargs supplied for this function are directly passed | |
| to *func*. Also, the current :class:`contextvars.Context` is propagated, | |
| allowing context variables from the main thread to be accessed in the | |
| separate thread. | |
| Return a coroutine that can be awaited to get the eventual result of *func*. | |
| backport from | |
| https://github.com/python/cpython/blob/3f1ea163ea54513e00e0e9d5442fee1b639825cc/Lib/asyncio/threads.py#L12-L25 | |
| """ | |
| loop = events.get_running_loop() | |
| ctx = contextvars.copy_context() | |
| func_call = functools.partial(ctx.run, func, *args, **kwargs) | |
| return await loop.run_in_executor(None, func_call) | |
| if sys.version_info >= (3, 9): | |
| from random import randbytes | |
| else: | |
| try: | |
| import numpy | |
| def randbytes(size): | |
| return numpy.random.randint(255, size=size, dtype="u8").tobytes() | |
| except ImportError: | |
| import secrets | |
| def randbytes(size): | |
| return secrets.token_bytes(size) | |
| if tornado.version_info >= (6, 2, 0, 0): | |
| from tornado.ioloop import PeriodicCallback | |
| else: | |
| # Backport from https://github.com/tornadoweb/tornado/blob/a4f08a31a348445094d1efa17880ed5472db9f7d/tornado/ioloop.py#L838-L962 | |
| # License https://github.com/tornadoweb/tornado/blob/v6.2.0/LICENSE | |
| # Includes minor modifications to source code to pass linting | |
| # This backport ensures that async callbacks are not overlapping if a run | |
| # takes longer than the interval | |
| import datetime | |
| import math | |
| import random | |
| from inspect import isawaitable | |
| from typing import Awaitable, Callable | |
| from tornado.ioloop import IOLoop | |
| from tornado.log import app_log | |
| class PeriodicCallback: # type: ignore[no-redef] | |
| """Schedules the given callback to be called periodically. | |
| The callback is called every ``callback_time`` milliseconds when | |
| ``callback_time`` is a float. Note that the timeout is given in | |
| milliseconds, while most other time-related functions in Tornado use | |
| seconds. ``callback_time`` may alternatively be given as a | |
| `datetime.timedelta` object. | |
| If ``jitter`` is specified, each callback time will be randomly selected | |
| within a window of ``jitter * callback_time`` milliseconds. | |
| Jitter can be used to reduce alignment of events with similar periods. | |
| A jitter of 0.1 means allowing a 10% variation in callback time. | |
| The window is centered on ``callback_time`` so the total number of calls | |
| within a given interval should not be significantly affected by adding | |
| jitter. | |
| If the callback runs for longer than ``callback_time`` milliseconds, | |
| subsequent invocations will be skipped to get back on schedule. | |
| `start` must be called after the `PeriodicCallback` is created. | |
| .. versionchanged:: 5.0 | |
| The ``io_loop`` argument (deprecated since version 4.1) has been removed. | |
| .. versionchanged:: 5.1 | |
| The ``jitter`` argument is added. | |
| .. versionchanged:: 6.2 | |
| If the ``callback`` argument is a coroutine, and a callback runs for | |
| longer than ``callback_time``, subsequent invocations will be skipped. | |
| Previously this was only true for regular functions, not coroutines, | |
| which were "fire-and-forget" for `PeriodicCallback`. | |
| The ``callback_time`` argument now accepts `datetime.timedelta` objects, | |
| in addition to the previous numeric milliseconds. | |
| """ | |
| def __init__( | |
| self, | |
| callback: Callable[[], Awaitable | None], | |
| callback_time: datetime.timedelta | float, | |
| jitter: float = 0, | |
| ) -> None: | |
| self.callback = callback | |
| if isinstance(callback_time, datetime.timedelta): | |
| self.callback_time = callback_time / datetime.timedelta(milliseconds=1) | |
| else: | |
| if callback_time <= 0: | |
| raise ValueError( | |
| "Periodic callback must have a positive callback_time" | |
| ) | |
| self.callback_time = callback_time | |
| self.jitter = jitter | |
| self._running = False | |
| self._timeout = None # type: object | |
| def start(self) -> None: | |
| """Starts the timer.""" | |
| # Looking up the IOLoop here allows to first instantiate the | |
| # PeriodicCallback in another thread, then start it using | |
| # IOLoop.add_callback(). | |
| self.io_loop = IOLoop.current() | |
| self._running = True | |
| self._next_timeout = self.io_loop.time() | |
| self._schedule_next() | |
| def stop(self) -> None: | |
| """Stops the timer.""" | |
| self._running = False | |
| if self._timeout is not None: | |
| self.io_loop.remove_timeout(self._timeout) | |
| self._timeout = None | |
| def is_running(self) -> bool: | |
| """Returns ``True`` if this `.PeriodicCallback` has been started. | |
| .. versionadded:: 4.1 | |
| """ | |
| return self._running | |
| async def _run(self) -> None: | |
| if not self._running: | |
| return | |
| try: | |
| val = self.callback() | |
| if val is not None and isawaitable(val): | |
| await val | |
| except Exception: | |
| app_log.error("Exception in callback %r", self.callback, exc_info=True) | |
| finally: | |
| self._schedule_next() | |
| def _schedule_next(self) -> None: | |
| if self._running: | |
| self._update_next(self.io_loop.time()) | |
| self._timeout = self.io_loop.add_timeout(self._next_timeout, self._run) | |
| def _update_next(self, current_time: float) -> None: | |
| callback_time_sec = self.callback_time / 1000.0 | |
| if self.jitter: | |
| # apply jitter fraction | |
| callback_time_sec *= 1 + (self.jitter * (random.random() - 0.5)) | |
| if self._next_timeout <= current_time: | |
| # The period should be measured from the start of one call | |
| # to the start of the next. If one call takes too long, | |
| # skip cycles to get back to a multiple of the original | |
| # schedule. | |
| self._next_timeout += ( | |
| math.floor((current_time - self._next_timeout) / callback_time_sec) | |
| + 1 | |
| ) * callback_time_sec | |
| else: | |
| # If the clock moved backwards, ensure we advance the next | |
| # timeout instead of recomputing the same value again. | |
| # This may result in long gaps between callbacks if the | |
| # clock jumps backwards by a lot, but the far more common | |
| # scenario is a small NTP adjustment that should just be | |
| # ignored. | |
| # | |
| # Note that on some systems if time.time() runs slower | |
| # than time.monotonic() (most common on windows), we | |
| # effectively experience a small backwards time jump on | |
| # every iteration because PeriodicCallback uses | |
| # time.time() while asyncio schedules callbacks using | |
| # time.monotonic(). | |
| # https://github.com/tornadoweb/tornado/issues/2333 | |
| self._next_timeout += callback_time_sec | |