Spaces:
Running
on
Zero
Running
on
Zero
from __future__ import annotations | |
import collections | |
import time as timemod | |
from collections.abc import Callable | |
from functools import wraps | |
import psutil | |
from distributed.compatibility import WINDOWS | |
_empty_namedtuple = collections.namedtuple("_empty_namedtuple", ()) | |
def _psutil_caller(method_name, default=_empty_namedtuple): | |
""" | |
Return a function calling the given psutil *method_name*, | |
or returning *default* if psutil fails. | |
""" | |
meth = getattr(psutil, method_name) | |
def wrapper(): # pragma: no cover | |
try: | |
return meth() | |
except RuntimeError: | |
# This can happen on some systems (e.g. no physical disk in worker) | |
return default() | |
return wrapper | |
disk_io_counters = _psutil_caller("disk_io_counters") | |
net_io_counters = _psutil_caller("net_io_counters") | |
class _WindowsTime: | |
"""Combine time.time() or time.monotonic() with time.perf_counter() to get an | |
absolute clock with fine resolution. | |
""" | |
base_timer: Callable[[], float] | |
delta: float | |
previous: float | None | |
next_resync: float | |
resync_every: float | |
def __init__( | |
self, base: Callable[[], float], is_monotonic: bool, resync_every: float = 600.0 | |
): | |
self.base_timer = base | |
self.previous = float("-inf") if is_monotonic else None | |
self.next_resync = float("-inf") | |
self.resync_every = resync_every | |
def time(self) -> float: | |
cur = timemod.perf_counter() | |
if cur > self.next_resync: | |
self.resync() | |
self.next_resync = cur + self.resync_every | |
cur += self.delta | |
if self.previous is not None: | |
# Monotonic timer | |
if cur <= self.previous: | |
cur = self.previous + 1e-9 | |
self.previous = cur | |
return cur | |
def resync(self) -> None: | |
_time = self.base_timer | |
_perf_counter = timemod.perf_counter | |
min_samples = 5 | |
while True: | |
times = [(_time(), _perf_counter()) for _ in range(min_samples * 2)] | |
abs_times = collections.Counter(t[0] for t in times) | |
first, nfirst = abs_times.most_common()[0] | |
if nfirst < min_samples: | |
# System too noisy? Start again | |
continue | |
perf_times = [t[1] for t in times if t[0] == first][:-1] | |
assert len(perf_times) >= min_samples - 1, perf_times | |
self.delta = first - sum(perf_times) / len(perf_times) | |
break | |
# A high-resolution wall clock timer measuring the seconds since Unix epoch | |
if WINDOWS: | |
time = _WindowsTime(timemod.time, is_monotonic=False).time | |
monotonic = _WindowsTime(timemod.monotonic, is_monotonic=True).time | |
else: | |
# Under modern Unixes, time.time() and time.monotonic() should be good enough | |
time = timemod.time | |
monotonic = timemod.monotonic | |
process_time = timemod.process_time | |
# Get a per-thread CPU timer function if possible, otherwise | |
# use a per-process CPU timer function. | |
try: | |
# thread_time is not supported on all platforms | |
thread_time = timemod.thread_time | |
except (AttributeError, OSError): # pragma: no cover | |
thread_time = process_time | |