Spaces:
Runtime error
Runtime error
import functools | |
import itertools | |
import sys | |
from signal import SIGINT, default_int_handler, signal | |
from typing import Any, Callable, Iterator, Optional, Tuple | |
from pip._vendor.progress.bar import Bar, FillingCirclesBar, IncrementalBar | |
from pip._vendor.progress.spinner import Spinner | |
from pip._vendor.rich.progress import ( | |
BarColumn, | |
DownloadColumn, | |
FileSizeColumn, | |
Progress, | |
ProgressColumn, | |
SpinnerColumn, | |
TextColumn, | |
TimeElapsedColumn, | |
TimeRemainingColumn, | |
TransferSpeedColumn, | |
) | |
from pip._internal.utils.compat import WINDOWS | |
from pip._internal.utils.logging import get_indentation | |
from pip._internal.utils.misc import format_size | |
try: | |
from pip._vendor import colorama | |
# Lots of different errors can come from this, including SystemError and | |
# ImportError. | |
except Exception: | |
colorama = None | |
DownloadProgressRenderer = Callable[[Iterator[bytes]], Iterator[bytes]] | |
def _select_progress_class(preferred: Bar, fallback: Bar) -> Bar: | |
encoding = getattr(preferred.file, "encoding", None) | |
# If we don't know what encoding this file is in, then we'll just assume | |
# that it doesn't support unicode and use the ASCII bar. | |
if not encoding: | |
return fallback | |
# Collect all of the possible characters we want to use with the preferred | |
# bar. | |
characters = [ | |
getattr(preferred, "empty_fill", ""), | |
getattr(preferred, "fill", ""), | |
] | |
characters += list(getattr(preferred, "phases", [])) | |
# Try to decode the characters we're using for the bar using the encoding | |
# of the given file, if this works then we'll assume that we can use the | |
# fancier bar and if not we'll fall back to the plaintext bar. | |
try: | |
"".join(characters).encode(encoding) | |
except UnicodeEncodeError: | |
return fallback | |
else: | |
return preferred | |
_BaseBar: Any = _select_progress_class(IncrementalBar, Bar) | |
class InterruptibleMixin: | |
""" | |
Helper to ensure that self.finish() gets called on keyboard interrupt. | |
This allows downloads to be interrupted without leaving temporary state | |
(like hidden cursors) behind. | |
This class is similar to the progress library's existing SigIntMixin | |
helper, but as of version 1.2, that helper has the following problems: | |
1. It calls sys.exit(). | |
2. It discards the existing SIGINT handler completely. | |
3. It leaves its own handler in place even after an uninterrupted finish, | |
which will have unexpected delayed effects if the user triggers an | |
unrelated keyboard interrupt some time after a progress-displaying | |
download has already completed, for example. | |
""" | |
def __init__(self, *args: Any, **kwargs: Any) -> None: | |
""" | |
Save the original SIGINT handler for later. | |
""" | |
# https://github.com/python/mypy/issues/5887 | |
super().__init__(*args, **kwargs) # type: ignore | |
self.original_handler = signal(SIGINT, self.handle_sigint) | |
# If signal() returns None, the previous handler was not installed from | |
# Python, and we cannot restore it. This probably should not happen, | |
# but if it does, we must restore something sensible instead, at least. | |
# The least bad option should be Python's default SIGINT handler, which | |
# just raises KeyboardInterrupt. | |
if self.original_handler is None: | |
self.original_handler = default_int_handler | |
def finish(self) -> None: | |
""" | |
Restore the original SIGINT handler after finishing. | |
This should happen regardless of whether the progress display finishes | |
normally, or gets interrupted. | |
""" | |
super().finish() # type: ignore | |
signal(SIGINT, self.original_handler) | |
def handle_sigint(self, signum, frame): # type: ignore | |
""" | |
Call self.finish() before delegating to the original SIGINT handler. | |
This handler should only be in place while the progress display is | |
active. | |
""" | |
self.finish() | |
self.original_handler(signum, frame) | |
class SilentBar(Bar): | |
def update(self) -> None: | |
pass | |
class BlueEmojiBar(IncrementalBar): | |
suffix = "%(percent)d%%" | |
bar_prefix = " " | |
bar_suffix = " " | |
phases = ("\U0001F539", "\U0001F537", "\U0001F535") | |
class DownloadProgressMixin: | |
def __init__(self, *args: Any, **kwargs: Any) -> None: | |
# https://github.com/python/mypy/issues/5887 | |
super().__init__(*args, **kwargs) # type: ignore | |
self.message: str = (" " * (get_indentation() + 2)) + self.message | |
def downloaded(self) -> str: | |
return format_size(self.index) # type: ignore | |
def download_speed(self) -> str: | |
# Avoid zero division errors... | |
if self.avg == 0.0: # type: ignore | |
return "..." | |
return format_size(1 / self.avg) + "/s" # type: ignore | |
def pretty_eta(self) -> str: | |
if self.eta: # type: ignore | |
return f"eta {self.eta_td}" # type: ignore | |
return "" | |
def iter(self, it): # type: ignore | |
for x in it: | |
yield x | |
# B305 is incorrectly raised here | |
# https://github.com/PyCQA/flake8-bugbear/issues/59 | |
self.next(len(x)) # noqa: B305 | |
self.finish() | |
class WindowsMixin: | |
def __init__(self, *args: Any, **kwargs: Any) -> None: | |
# The Windows terminal does not support the hide/show cursor ANSI codes | |
# even with colorama. So we'll ensure that hide_cursor is False on | |
# Windows. | |
# This call needs to go before the super() call, so that hide_cursor | |
# is set in time. The base progress bar class writes the "hide cursor" | |
# code to the terminal in its init, so if we don't set this soon | |
# enough, we get a "hide" with no corresponding "show"... | |
if WINDOWS and self.hide_cursor: # type: ignore | |
self.hide_cursor = False | |
# https://github.com/python/mypy/issues/5887 | |
super().__init__(*args, **kwargs) # type: ignore | |
# Check if we are running on Windows and we have the colorama module, | |
# if we do then wrap our file with it. | |
if WINDOWS and colorama: | |
self.file = colorama.AnsiToWin32(self.file) # type: ignore | |
# The progress code expects to be able to call self.file.isatty() | |
# but the colorama.AnsiToWin32() object doesn't have that, so we'll | |
# add it. | |
self.file.isatty = lambda: self.file.wrapped.isatty() | |
# The progress code expects to be able to call self.file.flush() | |
# but the colorama.AnsiToWin32() object doesn't have that, so we'll | |
# add it. | |
self.file.flush = lambda: self.file.wrapped.flush() | |
class BaseDownloadProgressBar(WindowsMixin, InterruptibleMixin, DownloadProgressMixin): | |
file = sys.stdout | |
message = "%(percent)d%%" | |
suffix = "%(downloaded)s %(download_speed)s %(pretty_eta)s" | |
class DefaultDownloadProgressBar(BaseDownloadProgressBar, _BaseBar): | |
pass | |
class DownloadSilentBar(BaseDownloadProgressBar, SilentBar): | |
pass | |
class DownloadBar(BaseDownloadProgressBar, Bar): | |
pass | |
class DownloadFillingCirclesBar(BaseDownloadProgressBar, FillingCirclesBar): | |
pass | |
class DownloadBlueEmojiProgressBar(BaseDownloadProgressBar, BlueEmojiBar): | |
pass | |
class DownloadProgressSpinner( | |
WindowsMixin, InterruptibleMixin, DownloadProgressMixin, Spinner | |
): | |
file = sys.stdout | |
suffix = "%(downloaded)s %(download_speed)s" | |
def next_phase(self) -> str: | |
if not hasattr(self, "_phaser"): | |
self._phaser = itertools.cycle(self.phases) | |
return next(self._phaser) | |
def update(self) -> None: | |
message = self.message % self | |
phase = self.next_phase() | |
suffix = self.suffix % self | |
line = "".join( | |
[ | |
message, | |
" " if message else "", | |
phase, | |
" " if suffix else "", | |
suffix, | |
] | |
) | |
self.writeln(line) | |
BAR_TYPES = { | |
"off": (DownloadSilentBar, DownloadSilentBar), | |
"on": (DefaultDownloadProgressBar, DownloadProgressSpinner), | |
"ascii": (DownloadBar, DownloadProgressSpinner), | |
"pretty": (DownloadFillingCirclesBar, DownloadProgressSpinner), | |
"emoji": (DownloadBlueEmojiProgressBar, DownloadProgressSpinner), | |
} | |
def _legacy_progress_bar( | |
progress_bar: str, max: Optional[int] | |
) -> DownloadProgressRenderer: | |
if max is None or max == 0: | |
return BAR_TYPES[progress_bar][1]().iter # type: ignore | |
else: | |
return BAR_TYPES[progress_bar][0](max=max).iter | |
# | |
# Modern replacement, for our legacy progress bars. | |
# | |
def _rich_progress_bar( | |
iterable: Iterator[bytes], | |
*, | |
bar_type: str, | |
size: int, | |
) -> Iterator[bytes]: | |
assert bar_type == "on", "This should only be used in the default mode." | |
if not size: | |
total = float("inf") | |
columns: Tuple[ProgressColumn, ...] = ( | |
TextColumn("[progress.description]{task.description}"), | |
SpinnerColumn("line", speed=1.5), | |
FileSizeColumn(), | |
TransferSpeedColumn(), | |
TimeElapsedColumn(), | |
) | |
else: | |
total = size | |
columns = ( | |
TextColumn("[progress.description]{task.description}"), | |
BarColumn(), | |
DownloadColumn(), | |
TransferSpeedColumn(), | |
TextColumn("eta"), | |
TimeRemainingColumn(), | |
) | |
progress = Progress(*columns, refresh_per_second=30) | |
task_id = progress.add_task(" " * (get_indentation() + 2), total=total) | |
with progress: | |
for chunk in iterable: | |
yield chunk | |
progress.update(task_id, advance=len(chunk)) | |
def get_download_progress_renderer( | |
*, bar_type: str, size: Optional[int] = None | |
) -> DownloadProgressRenderer: | |
"""Get an object that can be used to render the download progress. | |
Returns a callable, that takes an iterable to "wrap". | |
""" | |
if bar_type == "on": | |
return functools.partial(_rich_progress_bar, bar_type=bar_type, size=size) | |
elif bar_type == "off": | |
return iter # no-op, when passed an iterator | |
else: | |
return _legacy_progress_bar(bar_type, size) | |