Spaces:
Runtime error
Runtime error
# Copyright 2016β2021 Julien Danjou | |
# Copyright 2016 Joshua Harlow | |
# Copyright 2013-2014 Ray Holder | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
import abc | |
import random | |
import typing | |
from pip._vendor.tenacity import _utils | |
if typing.TYPE_CHECKING: | |
from pip._vendor.tenacity import RetryCallState | |
class wait_base(abc.ABC): | |
"""Abstract base class for wait strategies.""" | |
def __call__(self, retry_state: "RetryCallState") -> float: | |
pass | |
def __add__(self, other: "wait_base") -> "wait_combine": | |
return wait_combine(self, other) | |
def __radd__(self, other: "wait_base") -> typing.Union["wait_combine", "wait_base"]: | |
# make it possible to use multiple waits with the built-in sum function | |
if other == 0: | |
return self | |
return self.__add__(other) | |
class wait_fixed(wait_base): | |
"""Wait strategy that waits a fixed amount of time between each retry.""" | |
def __init__(self, wait: float) -> None: | |
self.wait_fixed = wait | |
def __call__(self, retry_state: "RetryCallState") -> float: | |
return self.wait_fixed | |
class wait_none(wait_fixed): | |
"""Wait strategy that doesn't wait at all before retrying.""" | |
def __init__(self) -> None: | |
super().__init__(0) | |
class wait_random(wait_base): | |
"""Wait strategy that waits a random amount of time between min/max.""" | |
def __init__(self, min: typing.Union[int, float] = 0, max: typing.Union[int, float] = 1) -> None: # noqa | |
self.wait_random_min = min | |
self.wait_random_max = max | |
def __call__(self, retry_state: "RetryCallState") -> float: | |
return self.wait_random_min + (random.random() * (self.wait_random_max - self.wait_random_min)) | |
class wait_combine(wait_base): | |
"""Combine several waiting strategies.""" | |
def __init__(self, *strategies: wait_base) -> None: | |
self.wait_funcs = strategies | |
def __call__(self, retry_state: "RetryCallState") -> float: | |
return sum(x(retry_state=retry_state) for x in self.wait_funcs) | |
class wait_chain(wait_base): | |
"""Chain two or more waiting strategies. | |
If all strategies are exhausted, the very last strategy is used | |
thereafter. | |
For example:: | |
@retry(wait=wait_chain(*[wait_fixed(1) for i in range(3)] + | |
[wait_fixed(2) for j in range(5)] + | |
[wait_fixed(5) for k in range(4))) | |
def wait_chained(): | |
print("Wait 1s for 3 attempts, 2s for 5 attempts and 5s | |
thereafter.") | |
""" | |
def __init__(self, *strategies: wait_base) -> None: | |
self.strategies = strategies | |
def __call__(self, retry_state: "RetryCallState") -> float: | |
wait_func_no = min(max(retry_state.attempt_number, 1), len(self.strategies)) | |
wait_func = self.strategies[wait_func_no - 1] | |
return wait_func(retry_state=retry_state) | |
class wait_incrementing(wait_base): | |
"""Wait an incremental amount of time after each attempt. | |
Starting at a starting value and incrementing by a value for each attempt | |
(and restricting the upper limit to some maximum value). | |
""" | |
def __init__( | |
self, | |
start: typing.Union[int, float] = 0, | |
increment: typing.Union[int, float] = 100, | |
max: typing.Union[int, float] = _utils.MAX_WAIT, # noqa | |
) -> None: | |
self.start = start | |
self.increment = increment | |
self.max = max | |
def __call__(self, retry_state: "RetryCallState") -> float: | |
result = self.start + (self.increment * (retry_state.attempt_number - 1)) | |
return max(0, min(result, self.max)) | |
class wait_exponential(wait_base): | |
"""Wait strategy that applies exponential backoff. | |
It allows for a customized multiplier and an ability to restrict the | |
upper and lower limits to some maximum and minimum value. | |
The intervals are fixed (i.e. there is no jitter), so this strategy is | |
suitable for balancing retries against latency when a required resource is | |
unavailable for an unknown duration, but *not* suitable for resolving | |
contention between multiple processes for a shared resource. Use | |
wait_random_exponential for the latter case. | |
""" | |
def __init__( | |
self, | |
multiplier: typing.Union[int, float] = 1, | |
max: typing.Union[int, float] = _utils.MAX_WAIT, # noqa | |
exp_base: typing.Union[int, float] = 2, | |
min: typing.Union[int, float] = 0, # noqa | |
) -> None: | |
self.multiplier = multiplier | |
self.min = min | |
self.max = max | |
self.exp_base = exp_base | |
def __call__(self, retry_state: "RetryCallState") -> float: | |
try: | |
exp = self.exp_base ** (retry_state.attempt_number - 1) | |
result = self.multiplier * exp | |
except OverflowError: | |
return self.max | |
return max(max(0, self.min), min(result, self.max)) | |
class wait_random_exponential(wait_exponential): | |
"""Random wait with exponentially widening window. | |
An exponential backoff strategy used to mediate contention between multiple | |
uncoordinated processes for a shared resource in distributed systems. This | |
is the sense in which "exponential backoff" is meant in e.g. Ethernet | |
networking, and corresponds to the "Full Jitter" algorithm described in | |
this blog post: | |
https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ | |
Each retry occurs at a random time in a geometrically expanding interval. | |
It allows for a custom multiplier and an ability to restrict the upper | |
limit of the random interval to some maximum value. | |
Example:: | |
wait_random_exponential(multiplier=0.5, # initial window 0.5s | |
max=60) # max 60s timeout | |
When waiting for an unavailable resource to become available again, as | |
opposed to trying to resolve contention for a shared resource, the | |
wait_exponential strategy (which uses a fixed interval) may be preferable. | |
""" | |
def __call__(self, retry_state: "RetryCallState") -> float: | |
high = super().__call__(retry_state=retry_state) | |
return random.uniform(0, high) | |