|
|
|
|
|
|
|
try: |
|
from requests import JSONDecodeError |
|
except ImportError: |
|
try: |
|
from simplejson import JSONDecodeError |
|
except ImportError: |
|
from json import JSONDecodeError |
|
import contextlib |
|
import os |
|
import shutil |
|
import stat |
|
import tempfile |
|
import time |
|
from functools import partial |
|
from pathlib import Path |
|
from typing import Callable, Generator, Optional, Union |
|
|
|
import yaml |
|
from filelock import BaseFileLock, FileLock, SoftFileLock, Timeout |
|
|
|
from .. import constants |
|
from . import logging |
|
|
|
|
|
logger = logging.get_logger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
yaml_dump: Callable[..., str] = partial(yaml.dump, stream=None, allow_unicode=True) |
|
|
|
|
|
@contextlib.contextmanager |
|
def SoftTemporaryDirectory( |
|
suffix: Optional[str] = None, |
|
prefix: Optional[str] = None, |
|
dir: Optional[Union[Path, str]] = None, |
|
**kwargs, |
|
) -> Generator[Path, None, None]: |
|
""" |
|
Context manager to create a temporary directory and safely delete it. |
|
|
|
If tmp directory cannot be deleted normally, we set the WRITE permission and retry. |
|
If cleanup still fails, we give up but don't raise an exception. This is equivalent |
|
to `tempfile.TemporaryDirectory(..., ignore_cleanup_errors=True)` introduced in |
|
Python 3.10. |
|
|
|
See https://www.scivision.dev/python-tempfile-permission-error-windows/. |
|
""" |
|
tmpdir = tempfile.TemporaryDirectory(prefix=prefix, suffix=suffix, dir=dir, **kwargs) |
|
yield Path(tmpdir.name).resolve() |
|
|
|
try: |
|
|
|
shutil.rmtree(tmpdir.name) |
|
except Exception: |
|
|
|
try: |
|
shutil.rmtree(tmpdir.name, onerror=_set_write_permission_and_retry) |
|
except Exception: |
|
pass |
|
|
|
|
|
|
|
try: |
|
tmpdir.cleanup() |
|
except Exception: |
|
pass |
|
|
|
|
|
def _set_write_permission_and_retry(func, path, excinfo): |
|
os.chmod(path, stat.S_IWRITE) |
|
func(path) |
|
|
|
|
|
@contextlib.contextmanager |
|
def WeakFileLock( |
|
lock_file: Union[str, Path], *, timeout: Optional[float] = None |
|
) -> Generator[BaseFileLock, None, None]: |
|
"""A filelock with some custom logic. |
|
|
|
This filelock is weaker than the default filelock in that: |
|
1. It won't raise an exception if release fails. |
|
2. It will default to a SoftFileLock if the filesystem does not support flock. |
|
|
|
An INFO log message is emitted every 10 seconds if the lock is not acquired immediately. |
|
If a timeout is provided, a `filelock.Timeout` exception is raised if the lock is not acquired within the timeout. |
|
""" |
|
log_interval = constants.FILELOCK_LOG_EVERY_SECONDS |
|
lock = FileLock(lock_file, timeout=log_interval) |
|
start_time = time.time() |
|
|
|
while True: |
|
elapsed_time = time.time() - start_time |
|
if timeout is not None and elapsed_time >= timeout: |
|
raise Timeout(str(lock_file)) |
|
|
|
try: |
|
lock.acquire(timeout=min(log_interval, timeout - elapsed_time) if timeout else log_interval) |
|
except Timeout: |
|
logger.info( |
|
f"Still waiting to acquire lock on {lock_file} (elapsed: {time.time() - start_time:.1f} seconds)" |
|
) |
|
except NotImplementedError as e: |
|
if "use SoftFileLock instead" in str(e): |
|
logger.warning( |
|
"FileSystem does not appear to support flock. Falling back to SoftFileLock for %s", lock_file |
|
) |
|
lock = SoftFileLock(lock_file, timeout=log_interval) |
|
continue |
|
else: |
|
break |
|
|
|
try: |
|
yield lock |
|
finally: |
|
try: |
|
lock.release() |
|
except OSError: |
|
try: |
|
Path(lock_file).unlink() |
|
except OSError: |
|
pass |
|
|