Spaces:
Runtime error
Runtime error
"""Caching of formatted files with feature-based invalidation.""" | |
import os | |
import pickle | |
import tempfile | |
from pathlib import Path | |
from typing import Dict, Iterable, Set, Tuple | |
from platformdirs import user_cache_dir | |
from _black_version import version as __version__ | |
from black.mode import Mode | |
# types | |
Timestamp = float | |
FileSize = int | |
CacheInfo = Tuple[Timestamp, FileSize] | |
Cache = Dict[str, CacheInfo] | |
def get_cache_dir() -> Path: | |
"""Get the cache directory used by black. | |
Users can customize this directory on all systems using `BLACK_CACHE_DIR` | |
environment variable. By default, the cache directory is the user cache directory | |
under the black application. | |
This result is immediately set to a constant `black.cache.CACHE_DIR` as to avoid | |
repeated calls. | |
""" | |
# NOTE: Function mostly exists as a clean way to test getting the cache directory. | |
default_cache_dir = user_cache_dir("black", version=__version__) | |
cache_dir = Path(os.environ.get("BLACK_CACHE_DIR", default_cache_dir)) | |
return cache_dir | |
CACHE_DIR = get_cache_dir() | |
def read_cache(mode: Mode) -> Cache: | |
"""Read the cache if it exists and is well formed. | |
If it is not well formed, the call to write_cache later should resolve the issue. | |
""" | |
cache_file = get_cache_file(mode) | |
if not cache_file.exists(): | |
return {} | |
with cache_file.open("rb") as fobj: | |
try: | |
cache: Cache = pickle.load(fobj) | |
except (pickle.UnpicklingError, ValueError, IndexError): | |
return {} | |
return cache | |
def get_cache_file(mode: Mode) -> Path: | |
return CACHE_DIR / f"cache.{mode.get_cache_key()}.pickle" | |
def get_cache_info(path: Path) -> CacheInfo: | |
"""Return the information used to check if a file is already formatted or not.""" | |
stat = path.stat() | |
return stat.st_mtime, stat.st_size | |
def filter_cached(cache: Cache, sources: Iterable[Path]) -> Tuple[Set[Path], Set[Path]]: | |
"""Split an iterable of paths in `sources` into two sets. | |
The first contains paths of files that modified on disk or are not in the | |
cache. The other contains paths to non-modified files. | |
""" | |
todo, done = set(), set() | |
for src in sources: | |
res_src = src.resolve() | |
if cache.get(str(res_src)) != get_cache_info(res_src): | |
todo.add(src) | |
else: | |
done.add(src) | |
return todo, done | |
def write_cache(cache: Cache, sources: Iterable[Path], mode: Mode) -> None: | |
"""Update the cache file.""" | |
cache_file = get_cache_file(mode) | |
try: | |
CACHE_DIR.mkdir(parents=True, exist_ok=True) | |
new_cache = { | |
**cache, | |
**{str(src.resolve()): get_cache_info(src) for src in sources}, | |
} | |
with tempfile.NamedTemporaryFile(dir=str(cache_file.parent), delete=False) as f: | |
pickle.dump(new_cache, f, protocol=4) | |
os.replace(f.name, cache_file) | |
except OSError: | |
pass | |