File size: 4,437 Bytes
7885a28 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# JSONDecodeError was introduced in requests=2.27 released in 2022.
# This allows us to support older requests for users
# More information: https://github.com/psf/requests/pull/5856
try:
from requests import JSONDecodeError # type: ignore # noqa: F401
except ImportError:
try:
from simplejson import JSONDecodeError # type: ignore # noqa: F401
except ImportError:
from json import JSONDecodeError # type: ignore # noqa: F401
import contextlib
import os
import shutil
import stat
import tempfile
import time
from functools import partial
from pathlib import Path
from typing import Callable, Generator, Optional, Union
import yaml
from filelock import BaseFileLock, FileLock, SoftFileLock, Timeout
from .. import constants
from . import logging
logger = logging.get_logger(__name__)
# Wrap `yaml.dump` to set `allow_unicode=True` by default.
#
# Example:
# ```py
# >>> yaml.dump({"emoji": "π", "some unicode": "ζ₯ζ¬γ"})
# 'emoji: "\\U0001F440"\nsome unicode: "\\u65E5\\u672C\\u304B"\n'
#
# >>> yaml_dump({"emoji": "π", "some unicode": "ζ₯ζ¬γ"})
# 'emoji: "π"\nsome unicode: "ζ₯ζ¬γ"\n'
# ```
yaml_dump: Callable[..., str] = partial(yaml.dump, stream=None, allow_unicode=True) # type: ignore
@contextlib.contextmanager
def SoftTemporaryDirectory(
suffix: Optional[str] = None,
prefix: Optional[str] = None,
dir: Optional[Union[Path, str]] = None,
**kwargs,
) -> Generator[Path, None, None]:
"""
Context manager to create a temporary directory and safely delete it.
If tmp directory cannot be deleted normally, we set the WRITE permission and retry.
If cleanup still fails, we give up but don't raise an exception. This is equivalent
to `tempfile.TemporaryDirectory(..., ignore_cleanup_errors=True)` introduced in
Python 3.10.
See https://www.scivision.dev/python-tempfile-permission-error-windows/.
"""
tmpdir = tempfile.TemporaryDirectory(prefix=prefix, suffix=suffix, dir=dir, **kwargs)
yield Path(tmpdir.name).resolve()
try:
# First once with normal cleanup
shutil.rmtree(tmpdir.name)
except Exception:
# If failed, try to set write permission and retry
try:
shutil.rmtree(tmpdir.name, onerror=_set_write_permission_and_retry)
except Exception:
pass
# And finally, cleanup the tmpdir.
# If it fails again, give up but do not throw error
try:
tmpdir.cleanup()
except Exception:
pass
def _set_write_permission_and_retry(func, path, excinfo):
os.chmod(path, stat.S_IWRITE)
func(path)
@contextlib.contextmanager
def WeakFileLock(
lock_file: Union[str, Path], *, timeout: Optional[float] = None
) -> Generator[BaseFileLock, None, None]:
"""A filelock with some custom logic.
This filelock is weaker than the default filelock in that:
1. It won't raise an exception if release fails.
2. It will default to a SoftFileLock if the filesystem does not support flock.
An INFO log message is emitted every 10 seconds if the lock is not acquired immediately.
If a timeout is provided, a `filelock.Timeout` exception is raised if the lock is not acquired within the timeout.
"""
log_interval = constants.FILELOCK_LOG_EVERY_SECONDS
lock = FileLock(lock_file, timeout=log_interval)
start_time = time.time()
while True:
elapsed_time = time.time() - start_time
if timeout is not None and elapsed_time >= timeout:
raise Timeout(str(lock_file))
try:
lock.acquire(timeout=min(log_interval, timeout - elapsed_time) if timeout else log_interval)
except Timeout:
logger.info(
f"Still waiting to acquire lock on {lock_file} (elapsed: {time.time() - start_time:.1f} seconds)"
)
except NotImplementedError as e:
if "use SoftFileLock instead" in str(e):
logger.warning(
"FileSystem does not appear to support flock. Falling back to SoftFileLock for %s", lock_file
)
lock = SoftFileLock(lock_file, timeout=log_interval)
continue
else:
break
try:
yield lock
finally:
try:
lock.release()
except OSError:
try:
Path(lock_file).unlink()
except OSError:
pass
|