|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import abc |
|
import random |
|
import typing |
|
from datetime import timedelta |
|
|
|
from pip._vendor.tenacity import _utils |
|
|
|
if typing.TYPE_CHECKING: |
|
from pip._vendor.tenacity import RetryCallState |
|
|
|
wait_unit_type = typing.Union[int, float, timedelta] |
|
|
|
|
|
def to_seconds(wait_unit: wait_unit_type) -> float: |
|
return float(wait_unit.total_seconds() if isinstance(wait_unit, timedelta) else wait_unit) |
|
|
|
|
|
class wait_base(abc.ABC): |
|
"""Abstract base class for wait strategies.""" |
|
|
|
@abc.abstractmethod |
|
def __call__(self, retry_state: "RetryCallState") -> float: |
|
pass |
|
|
|
def __add__(self, other: "wait_base") -> "wait_combine": |
|
return wait_combine(self, other) |
|
|
|
def __radd__(self, other: "wait_base") -> typing.Union["wait_combine", "wait_base"]: |
|
|
|
if other == 0: |
|
return self |
|
return self.__add__(other) |
|
|
|
|
|
class wait_fixed(wait_base): |
|
"""Wait strategy that waits a fixed amount of time between each retry.""" |
|
|
|
def __init__(self, wait: wait_unit_type) -> None: |
|
self.wait_fixed = to_seconds(wait) |
|
|
|
def __call__(self, retry_state: "RetryCallState") -> float: |
|
return self.wait_fixed |
|
|
|
|
|
class wait_none(wait_fixed): |
|
"""Wait strategy that doesn't wait at all before retrying.""" |
|
|
|
def __init__(self) -> None: |
|
super().__init__(0) |
|
|
|
|
|
class wait_random(wait_base): |
|
"""Wait strategy that waits a random amount of time between min/max.""" |
|
|
|
def __init__(self, min: wait_unit_type = 0, max: wait_unit_type = 1) -> None: |
|
self.wait_random_min = to_seconds(min) |
|
self.wait_random_max = to_seconds(max) |
|
|
|
def __call__(self, retry_state: "RetryCallState") -> float: |
|
return self.wait_random_min + (random.random() * (self.wait_random_max - self.wait_random_min)) |
|
|
|
|
|
class wait_combine(wait_base): |
|
"""Combine several waiting strategies.""" |
|
|
|
def __init__(self, *strategies: wait_base) -> None: |
|
self.wait_funcs = strategies |
|
|
|
def __call__(self, retry_state: "RetryCallState") -> float: |
|
return sum(x(retry_state=retry_state) for x in self.wait_funcs) |
|
|
|
|
|
class wait_chain(wait_base): |
|
"""Chain two or more waiting strategies. |
|
|
|
If all strategies are exhausted, the very last strategy is used |
|
thereafter. |
|
|
|
For example:: |
|
|
|
@retry(wait=wait_chain(*[wait_fixed(1) for i in range(3)] + |
|
[wait_fixed(2) for j in range(5)] + |
|
[wait_fixed(5) for k in range(4))) |
|
def wait_chained(): |
|
print("Wait 1s for 3 attempts, 2s for 5 attempts and 5s |
|
thereafter.") |
|
""" |
|
|
|
def __init__(self, *strategies: wait_base) -> None: |
|
self.strategies = strategies |
|
|
|
def __call__(self, retry_state: "RetryCallState") -> float: |
|
wait_func_no = min(max(retry_state.attempt_number, 1), len(self.strategies)) |
|
wait_func = self.strategies[wait_func_no - 1] |
|
return wait_func(retry_state=retry_state) |
|
|
|
|
|
class wait_incrementing(wait_base): |
|
"""Wait an incremental amount of time after each attempt. |
|
|
|
Starting at a starting value and incrementing by a value for each attempt |
|
(and restricting the upper limit to some maximum value). |
|
""" |
|
|
|
def __init__( |
|
self, |
|
start: wait_unit_type = 0, |
|
increment: wait_unit_type = 100, |
|
max: wait_unit_type = _utils.MAX_WAIT, |
|
) -> None: |
|
self.start = to_seconds(start) |
|
self.increment = to_seconds(increment) |
|
self.max = to_seconds(max) |
|
|
|
def __call__(self, retry_state: "RetryCallState") -> float: |
|
result = self.start + (self.increment * (retry_state.attempt_number - 1)) |
|
return max(0, min(result, self.max)) |
|
|
|
|
|
class wait_exponential(wait_base): |
|
"""Wait strategy that applies exponential backoff. |
|
|
|
It allows for a customized multiplier and an ability to restrict the |
|
upper and lower limits to some maximum and minimum value. |
|
|
|
The intervals are fixed (i.e. there is no jitter), so this strategy is |
|
suitable for balancing retries against latency when a required resource is |
|
unavailable for an unknown duration, but *not* suitable for resolving |
|
contention between multiple processes for a shared resource. Use |
|
wait_random_exponential for the latter case. |
|
""" |
|
|
|
def __init__( |
|
self, |
|
multiplier: typing.Union[int, float] = 1, |
|
max: wait_unit_type = _utils.MAX_WAIT, |
|
exp_base: typing.Union[int, float] = 2, |
|
min: wait_unit_type = 0, |
|
) -> None: |
|
self.multiplier = multiplier |
|
self.min = to_seconds(min) |
|
self.max = to_seconds(max) |
|
self.exp_base = exp_base |
|
|
|
def __call__(self, retry_state: "RetryCallState") -> float: |
|
try: |
|
exp = self.exp_base ** (retry_state.attempt_number - 1) |
|
result = self.multiplier * exp |
|
except OverflowError: |
|
return self.max |
|
return max(max(0, self.min), min(result, self.max)) |
|
|
|
|
|
class wait_random_exponential(wait_exponential): |
|
"""Random wait with exponentially widening window. |
|
|
|
An exponential backoff strategy used to mediate contention between multiple |
|
uncoordinated processes for a shared resource in distributed systems. This |
|
is the sense in which "exponential backoff" is meant in e.g. Ethernet |
|
networking, and corresponds to the "Full Jitter" algorithm described in |
|
this blog post: |
|
|
|
https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ |
|
|
|
Each retry occurs at a random time in a geometrically expanding interval. |
|
It allows for a custom multiplier and an ability to restrict the upper |
|
limit of the random interval to some maximum value. |
|
|
|
Example:: |
|
|
|
wait_random_exponential(multiplier=0.5, # initial window 0.5s |
|
max=60) # max 60s timeout |
|
|
|
When waiting for an unavailable resource to become available again, as |
|
opposed to trying to resolve contention for a shared resource, the |
|
wait_exponential strategy (which uses a fixed interval) may be preferable. |
|
|
|
""" |
|
|
|
def __call__(self, retry_state: "RetryCallState") -> float: |
|
high = super().__call__(retry_state=retry_state) |
|
return random.uniform(0, high) |
|
|
|
|
|
class wait_exponential_jitter(wait_base): |
|
"""Wait strategy that applies exponential backoff and jitter. |
|
|
|
It allows for a customized initial wait, maximum wait and jitter. |
|
|
|
This implements the strategy described here: |
|
https://cloud.google.com/storage/docs/retry-strategy |
|
|
|
The wait time is min(initial * (2**n + random.uniform(0, jitter)), maximum) |
|
where n is the retry count. |
|
""" |
|
|
|
def __init__( |
|
self, |
|
initial: float = 1, |
|
max: float = _utils.MAX_WAIT, |
|
exp_base: float = 2, |
|
jitter: float = 1, |
|
) -> None: |
|
self.initial = initial |
|
self.max = max |
|
self.exp_base = exp_base |
|
self.jitter = jitter |
|
|
|
def __call__(self, retry_state: "RetryCallState") -> float: |
|
jitter = random.uniform(0, self.jitter) |
|
try: |
|
exp = self.exp_base ** (retry_state.attempt_number - 1) |
|
result = self.initial * exp + jitter |
|
except OverflowError: |
|
result = self.max |
|
return max(0, min(result, self.max)) |
|
|