Spaces:
Runtime error
Runtime error
File size: 6,359 Bytes
6eefbd7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
"""
Formatting many files at once via multiprocessing. Contains entrypoint and utilities.
NOTE: this module is only imported if we need to format several files at once.
"""
import asyncio
import logging
import os
import signal
import sys
from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor
from multiprocessing import Manager
from pathlib import Path
from typing import Any, Iterable, Optional, Set
from mypy_extensions import mypyc_attr
from black import WriteBack, format_file_in_place
from black.cache import Cache, filter_cached, read_cache, write_cache
from black.mode import Mode
from black.output import err
from black.report import Changed, Report
def maybe_install_uvloop() -> None:
"""If our environment has uvloop installed we use it.
This is called only from command-line entry points to avoid
interfering with the parent process if Black is used as a library.
"""
try:
import uvloop
uvloop.install()
except ImportError:
pass
def cancel(tasks: Iterable["asyncio.Task[Any]"]) -> None:
"""asyncio signal handler that cancels all `tasks` and reports to stderr."""
err("Aborted!")
for task in tasks:
task.cancel()
def shutdown(loop: asyncio.AbstractEventLoop) -> None:
"""Cancel all pending tasks on `loop`, wait for them, and close the loop."""
try:
# This part is borrowed from asyncio/runners.py in Python 3.7b2.
to_cancel = [task for task in asyncio.all_tasks(loop) if not task.done()]
if not to_cancel:
return
for task in to_cancel:
task.cancel()
loop.run_until_complete(asyncio.gather(*to_cancel, return_exceptions=True))
finally:
# `concurrent.futures.Future` objects cannot be cancelled once they
# are already running. There might be some when the `shutdown()` happened.
# Silence their logger's spew about the event loop being closed.
cf_logger = logging.getLogger("concurrent.futures")
cf_logger.setLevel(logging.CRITICAL)
loop.close()
# diff-shades depends on being to monkeypatch this function to operate. I know it's
# not ideal, but this shouldn't cause any issues ... hopefully. ~ichard26
@mypyc_attr(patchable=True)
def reformat_many(
sources: Set[Path],
fast: bool,
write_back: WriteBack,
mode: Mode,
report: Report,
workers: Optional[int],
) -> None:
"""Reformat multiple files using a ProcessPoolExecutor."""
maybe_install_uvloop()
executor: Executor
if workers is None:
workers = int(os.environ.get("BLACK_NUM_WORKERS", 0))
workers = workers or os.cpu_count() or 1
if sys.platform == "win32":
# Work around https://bugs.python.org/issue26903
workers = min(workers, 60)
try:
executor = ProcessPoolExecutor(max_workers=workers)
except (ImportError, NotImplementedError, OSError):
# we arrive here if the underlying system does not support multi-processing
# like in AWS Lambda or Termux, in which case we gracefully fallback to
# a ThreadPoolExecutor with just a single worker (more workers would not do us
# any good due to the Global Interpreter Lock)
executor = ThreadPoolExecutor(max_workers=1)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
schedule_formatting(
sources=sources,
fast=fast,
write_back=write_back,
mode=mode,
report=report,
loop=loop,
executor=executor,
)
)
finally:
try:
shutdown(loop)
finally:
asyncio.set_event_loop(None)
if executor is not None:
executor.shutdown()
async def schedule_formatting(
sources: Set[Path],
fast: bool,
write_back: WriteBack,
mode: Mode,
report: "Report",
loop: asyncio.AbstractEventLoop,
executor: "Executor",
) -> None:
"""Run formatting of `sources` in parallel using the provided `executor`.
(Use ProcessPoolExecutors for actual parallelism.)
`write_back`, `fast`, and `mode` options are passed to
:func:`format_file_in_place`.
"""
cache: Cache = {}
if write_back not in (WriteBack.DIFF, WriteBack.COLOR_DIFF):
cache = read_cache(mode)
sources, cached = filter_cached(cache, sources)
for src in sorted(cached):
report.done(src, Changed.CACHED)
if not sources:
return
cancelled = []
sources_to_cache = []
lock = None
if write_back in (WriteBack.DIFF, WriteBack.COLOR_DIFF):
# For diff output, we need locks to ensure we don't interleave output
# from different processes.
manager = Manager()
lock = manager.Lock()
tasks = {
asyncio.ensure_future(
loop.run_in_executor(
executor, format_file_in_place, src, fast, mode, write_back, lock
)
): src
for src in sorted(sources)
}
pending = tasks.keys()
try:
loop.add_signal_handler(signal.SIGINT, cancel, pending)
loop.add_signal_handler(signal.SIGTERM, cancel, pending)
except NotImplementedError:
# There are no good alternatives for these on Windows.
pass
while pending:
done, _ = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
for task in done:
src = tasks.pop(task)
if task.cancelled():
cancelled.append(task)
elif task.exception():
report.failed(src, str(task.exception()))
else:
changed = Changed.YES if task.result() else Changed.NO
# If the file was written back or was successfully checked as
# well-formatted, store this information in the cache.
if write_back is WriteBack.YES or (
write_back is WriteBack.CHECK and changed is Changed.NO
):
sources_to_cache.append(src)
report.done(src, changed)
if cancelled:
await asyncio.gather(*cancelled, return_exceptions=True)
if sources_to_cache:
write_cache(cache, sources_to_cache, mode)
|