Spaces:
Runtime error
Runtime error
""" | |
Formatting many files at once via multiprocessing. Contains entrypoint and utilities. | |
NOTE: this module is only imported if we need to format several files at once. | |
""" | |
import asyncio | |
import logging | |
import os | |
import signal | |
import sys | |
from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor | |
from multiprocessing import Manager | |
from pathlib import Path | |
from typing import Any, Iterable, Optional, Set | |
from mypy_extensions import mypyc_attr | |
from black import WriteBack, format_file_in_place | |
from black.cache import Cache, filter_cached, read_cache, write_cache | |
from black.mode import Mode | |
from black.output import err | |
from black.report import Changed, Report | |
def maybe_install_uvloop() -> None: | |
"""If our environment has uvloop installed we use it. | |
This is called only from command-line entry points to avoid | |
interfering with the parent process if Black is used as a library. | |
""" | |
try: | |
import uvloop | |
uvloop.install() | |
except ImportError: | |
pass | |
def cancel(tasks: Iterable["asyncio.Task[Any]"]) -> None: | |
"""asyncio signal handler that cancels all `tasks` and reports to stderr.""" | |
err("Aborted!") | |
for task in tasks: | |
task.cancel() | |
def shutdown(loop: asyncio.AbstractEventLoop) -> None: | |
"""Cancel all pending tasks on `loop`, wait for them, and close the loop.""" | |
try: | |
# This part is borrowed from asyncio/runners.py in Python 3.7b2. | |
to_cancel = [task for task in asyncio.all_tasks(loop) if not task.done()] | |
if not to_cancel: | |
return | |
for task in to_cancel: | |
task.cancel() | |
loop.run_until_complete(asyncio.gather(*to_cancel, return_exceptions=True)) | |
finally: | |
# `concurrent.futures.Future` objects cannot be cancelled once they | |
# are already running. There might be some when the `shutdown()` happened. | |
# Silence their logger's spew about the event loop being closed. | |
cf_logger = logging.getLogger("concurrent.futures") | |
cf_logger.setLevel(logging.CRITICAL) | |
loop.close() | |
# diff-shades depends on being to monkeypatch this function to operate. I know it's | |
# not ideal, but this shouldn't cause any issues ... hopefully. ~ichard26 | |
def reformat_many( | |
sources: Set[Path], | |
fast: bool, | |
write_back: WriteBack, | |
mode: Mode, | |
report: Report, | |
workers: Optional[int], | |
) -> None: | |
"""Reformat multiple files using a ProcessPoolExecutor.""" | |
maybe_install_uvloop() | |
executor: Executor | |
if workers is None: | |
workers = int(os.environ.get("BLACK_NUM_WORKERS", 0)) | |
workers = workers or os.cpu_count() or 1 | |
if sys.platform == "win32": | |
# Work around https://bugs.python.org/issue26903 | |
workers = min(workers, 60) | |
try: | |
executor = ProcessPoolExecutor(max_workers=workers) | |
except (ImportError, NotImplementedError, OSError): | |
# we arrive here if the underlying system does not support multi-processing | |
# like in AWS Lambda or Termux, in which case we gracefully fallback to | |
# a ThreadPoolExecutor with just a single worker (more workers would not do us | |
# any good due to the Global Interpreter Lock) | |
executor = ThreadPoolExecutor(max_workers=1) | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
try: | |
loop.run_until_complete( | |
schedule_formatting( | |
sources=sources, | |
fast=fast, | |
write_back=write_back, | |
mode=mode, | |
report=report, | |
loop=loop, | |
executor=executor, | |
) | |
) | |
finally: | |
try: | |
shutdown(loop) | |
finally: | |
asyncio.set_event_loop(None) | |
if executor is not None: | |
executor.shutdown() | |
async def schedule_formatting( | |
sources: Set[Path], | |
fast: bool, | |
write_back: WriteBack, | |
mode: Mode, | |
report: "Report", | |
loop: asyncio.AbstractEventLoop, | |
executor: "Executor", | |
) -> None: | |
"""Run formatting of `sources` in parallel using the provided `executor`. | |
(Use ProcessPoolExecutors for actual parallelism.) | |
`write_back`, `fast`, and `mode` options are passed to | |
:func:`format_file_in_place`. | |
""" | |
cache: Cache = {} | |
if write_back not in (WriteBack.DIFF, WriteBack.COLOR_DIFF): | |
cache = read_cache(mode) | |
sources, cached = filter_cached(cache, sources) | |
for src in sorted(cached): | |
report.done(src, Changed.CACHED) | |
if not sources: | |
return | |
cancelled = [] | |
sources_to_cache = [] | |
lock = None | |
if write_back in (WriteBack.DIFF, WriteBack.COLOR_DIFF): | |
# For diff output, we need locks to ensure we don't interleave output | |
# from different processes. | |
manager = Manager() | |
lock = manager.Lock() | |
tasks = { | |
asyncio.ensure_future( | |
loop.run_in_executor( | |
executor, format_file_in_place, src, fast, mode, write_back, lock | |
) | |
): src | |
for src in sorted(sources) | |
} | |
pending = tasks.keys() | |
try: | |
loop.add_signal_handler(signal.SIGINT, cancel, pending) | |
loop.add_signal_handler(signal.SIGTERM, cancel, pending) | |
except NotImplementedError: | |
# There are no good alternatives for these on Windows. | |
pass | |
while pending: | |
done, _ = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) | |
for task in done: | |
src = tasks.pop(task) | |
if task.cancelled(): | |
cancelled.append(task) | |
elif task.exception(): | |
report.failed(src, str(task.exception())) | |
else: | |
changed = Changed.YES if task.result() else Changed.NO | |
# If the file was written back or was successfully checked as | |
# well-formatted, store this information in the cache. | |
if write_back is WriteBack.YES or ( | |
write_back is WriteBack.CHECK and changed is Changed.NO | |
): | |
sources_to_cache.append(src) | |
report.done(src, changed) | |
if cancelled: | |
await asyncio.gather(*cancelled, return_exceptions=True) | |
if sources_to_cache: | |
write_cache(cache, sources_to_cache, mode) | |