Spaces:
Running
on
Zero
Running
on
Zero
File size: 3,216 Bytes
d1ed09d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
from __future__ import annotations
import collections
import time as timemod
from collections.abc import Callable
from functools import wraps
import psutil
from distributed.compatibility import WINDOWS
_empty_namedtuple = collections.namedtuple("_empty_namedtuple", ())
def _psutil_caller(method_name, default=_empty_namedtuple):
"""
Return a function calling the given psutil *method_name*,
or returning *default* if psutil fails.
"""
meth = getattr(psutil, method_name)
@wraps(meth)
def wrapper(): # pragma: no cover
try:
return meth()
except RuntimeError:
# This can happen on some systems (e.g. no physical disk in worker)
return default()
return wrapper
disk_io_counters = _psutil_caller("disk_io_counters")
net_io_counters = _psutil_caller("net_io_counters")
class _WindowsTime:
"""Combine time.time() or time.monotonic() with time.perf_counter() to get an
absolute clock with fine resolution.
"""
base_timer: Callable[[], float]
delta: float
previous: float | None
next_resync: float
resync_every: float
def __init__(
self, base: Callable[[], float], is_monotonic: bool, resync_every: float = 600.0
):
self.base_timer = base
self.previous = float("-inf") if is_monotonic else None
self.next_resync = float("-inf")
self.resync_every = resync_every
def time(self) -> float:
cur = timemod.perf_counter()
if cur > self.next_resync:
self.resync()
self.next_resync = cur + self.resync_every
cur += self.delta
if self.previous is not None:
# Monotonic timer
if cur <= self.previous:
cur = self.previous + 1e-9
self.previous = cur
return cur
def resync(self) -> None:
_time = self.base_timer
_perf_counter = timemod.perf_counter
min_samples = 5
while True:
times = [(_time(), _perf_counter()) for _ in range(min_samples * 2)]
abs_times = collections.Counter(t[0] for t in times)
first, nfirst = abs_times.most_common()[0]
if nfirst < min_samples:
# System too noisy? Start again
continue
perf_times = [t[1] for t in times if t[0] == first][:-1]
assert len(perf_times) >= min_samples - 1, perf_times
self.delta = first - sum(perf_times) / len(perf_times)
break
# A high-resolution wall clock timer measuring the seconds since Unix epoch
if WINDOWS:
time = _WindowsTime(timemod.time, is_monotonic=False).time
monotonic = _WindowsTime(timemod.monotonic, is_monotonic=True).time
else:
# Under modern Unixes, time.time() and time.monotonic() should be good enough
time = timemod.time
monotonic = timemod.monotonic
process_time = timemod.process_time
# Get a per-thread CPU timer function if possible, otherwise
# use a per-process CPU timer function.
try:
# thread_time is not supported on all platforms
thread_time = timemod.thread_time
except (AttributeError, OSError): # pragma: no cover
thread_time = process_time
|