Spaces:
Running
on
Zero
Running
on
Zero
| from __future__ import annotations | |
| import glob | |
| import logging | |
| import os | |
| import shutil | |
| import stat | |
| import sys | |
| import tempfile | |
| import weakref | |
| from typing import ClassVar | |
| import locket | |
| import dask | |
| logger = logging.getLogger(__name__) | |
| DIR_LOCK_EXT = ".dirlock" | |
| def is_locking_enabled(): | |
| return dask.config.get("distributed.worker.use-file-locking") | |
| def safe_unlink(path): | |
| try: | |
| os.unlink(path) | |
| except FileNotFoundError: | |
| # Perhaps it was removed by someone else? | |
| pass | |
| except OSError as e: | |
| logger.error(f"Failed to remove {path}: {e}") | |
| class WorkDir: | |
| """ | |
| A temporary work directory inside a WorkSpace. | |
| """ | |
| dir_path: str | |
| _lock_path: str | |
| _finalizer: weakref.finalize | |
| def __init__(self, workspace, name=None, prefix=None): | |
| assert name is None or prefix is None | |
| if name is None: | |
| self.dir_path = tempfile.mkdtemp(prefix=prefix, dir=workspace.base_dir) | |
| else: | |
| self.dir_path = os.path.join(workspace.base_dir, name) | |
| os.mkdir(self.dir_path) # it shouldn't already exist | |
| if is_locking_enabled(): | |
| try: | |
| self._lock_path = os.path.join(self.dir_path + DIR_LOCK_EXT) | |
| assert not os.path.exists(self._lock_path) | |
| logger.debug("Locking %r...", self._lock_path) | |
| # Avoid a race condition before locking the file | |
| # by taking the global lock | |
| try: | |
| with workspace._global_lock(): | |
| self._lock_file = locket.lock_file(self._lock_path) | |
| self._lock_file.acquire() | |
| except OSError: | |
| logger.exception( | |
| "Could not acquire workspace lock on " | |
| "path: %s ." | |
| "Continuing without lock. " | |
| "This may result in workspaces not being " | |
| "cleaned up", | |
| self._lock_path, | |
| exc_info=True, | |
| ) | |
| self._lock_file = None | |
| except Exception: | |
| shutil.rmtree(self.dir_path, ignore_errors=True) | |
| raise | |
| workspace._known_locks.add(self._lock_path) | |
| self._finalizer = weakref.finalize( | |
| self, | |
| self._finalize, | |
| workspace, | |
| self._lock_path, | |
| self._lock_file, | |
| self.dir_path, | |
| ) | |
| else: | |
| self._finalizer = weakref.finalize( | |
| self, self._finalize, workspace, None, None, self.dir_path | |
| ) | |
| def release(self): | |
| """ | |
| Dispose of this directory. | |
| """ | |
| self._finalizer() | |
| def _finalize(cls, workspace, lock_path, lock_file, dir_path): | |
| try: | |
| workspace._purge_directory(dir_path) | |
| finally: | |
| if lock_file is not None: | |
| lock_file.release() | |
| if lock_path is not None: | |
| workspace._known_locks.remove(lock_path) | |
| safe_unlink(lock_path) | |
| class WorkSpace: | |
| """ | |
| An on-disk workspace that tracks disposable work directories inside it. | |
| If a process crashes or another event left stale directories behind, | |
| this will be detected and the directories purged. | |
| """ | |
| base_dir: str | |
| _global_lock_path: str | |
| _purge_lock_path: str | |
| # Keep track of all locks known to this process, to avoid several | |
| # WorkSpaces to step on each other's toes | |
| _known_locks: ClassVar[set[str]] = set() | |
| def __init__(self, base_dir: str): | |
| self.base_dir = self._init_workspace(base_dir) | |
| self._global_lock_path = os.path.join(self.base_dir, "global.lock") | |
| self._purge_lock_path = os.path.join(self.base_dir, "purge.lock") | |
| def _init_workspace(self, base_dir: str) -> str: | |
| """Create base_dir if it doesn't exist. | |
| If base_dir already exists but it's not writeable, change the name. | |
| """ | |
| base_dir = os.path.abspath(base_dir) | |
| try_dirs = [base_dir] | |
| # Note: WINDOWS constant doesn't work with `mypy --platform win32` | |
| if sys.platform != "win32": | |
| # - os.getlogin() raises OSError on containerized environments | |
| # - os.getuid() does not exist in Windows | |
| try_dirs.append(f"{base_dir}-{os.getuid()}") | |
| for try_dir in try_dirs: | |
| try: | |
| os.makedirs(try_dir) | |
| except FileExistsError: | |
| try: | |
| with tempfile.TemporaryFile(dir=try_dir): | |
| pass | |
| except PermissionError: | |
| continue | |
| return try_dir | |
| # If we reached this, we're likely in a containerized environment where /tmp | |
| # has been shared between containers through a mountpoint, every container | |
| # has an external $UID, but the internal one is the same for all. | |
| return tempfile.mkdtemp(prefix=base_dir + "-") | |
| def _global_lock(self, **kwargs): | |
| return locket.lock_file(self._global_lock_path, **kwargs) | |
| def _purge_lock(self, **kwargs): | |
| return locket.lock_file(self._purge_lock_path, **kwargs) | |
| def _purge_leftovers(self): | |
| if not is_locking_enabled(): | |
| return [] | |
| # List candidates with the global lock taken, to avoid purging | |
| # a lock file that was just created but not yet locked | |
| # (see WorkDir.__init__) | |
| lock = self._global_lock(timeout=0) | |
| try: | |
| lock.acquire() | |
| except locket.LockError: | |
| # No need to waste time here if someone else is busy doing | |
| # something on this workspace | |
| return [] | |
| else: | |
| try: | |
| candidates = list(self._list_unknown_locks()) | |
| finally: | |
| lock.release() | |
| # No need to hold the global lock here, especially as purging | |
| # can take time. Instead take the purge lock to avoid two | |
| # processes purging at once. | |
| purged = [] | |
| lock = self._purge_lock(timeout=0) | |
| try: | |
| lock.acquire() | |
| except locket.LockError: | |
| # No need for two processes to purge one after another | |
| pass | |
| else: | |
| try: | |
| for path in candidates: | |
| if self._check_lock_or_purge(path): | |
| purged.append(path) | |
| finally: | |
| lock.release() | |
| return purged | |
| def _list_unknown_locks(self): | |
| for p in glob.glob(os.path.join(self.base_dir, "*" + DIR_LOCK_EXT)): | |
| try: | |
| st = os.stat(p) | |
| except OSError: | |
| # May have been removed in the meantime | |
| pass | |
| else: | |
| # XXX restrict to files owned by current user? | |
| if stat.S_ISREG(st.st_mode): | |
| yield p | |
| def _purge_directory(self, dir_path): | |
| shutil.rmtree(dir_path, onerror=self._on_remove_error) | |
| def _check_lock_or_purge(self, lock_path): | |
| """ | |
| Try locking the given path, if it fails it's in use, | |
| otherwise the corresponding directory is deleted. | |
| Return True if the lock was stale. | |
| """ | |
| assert lock_path.endswith(DIR_LOCK_EXT) | |
| if lock_path in self._known_locks: | |
| # Avoid touching a lock that we know is already taken | |
| return False | |
| logger.debug("Checking lock file %r...", lock_path) | |
| lock = locket.lock_file(lock_path, timeout=0) | |
| try: | |
| lock.acquire() | |
| except locket.LockError: | |
| # Lock file still in use, ignore | |
| return False | |
| try: | |
| # Lock file is stale, therefore purge corresponding directory | |
| dir_path = lock_path[: -len(DIR_LOCK_EXT)] | |
| if os.path.exists(dir_path): | |
| logger.info("Found stale lock file and directory %r, purging", dir_path) | |
| self._purge_directory(dir_path) | |
| finally: | |
| lock.release() | |
| # Clean up lock file after we released it | |
| safe_unlink(lock_path) | |
| return True | |
| def _on_remove_error(self, func, path, exc_info): | |
| typ, exc, tb = exc_info | |
| logger.error("Failed to remove %r (failed in %r): %s", path, func, str(exc)) | |
| def new_work_dir(self, **kwargs): | |
| """ | |
| Create and return a new WorkDir in this WorkSpace. | |
| Either the *prefix* or *name* parameter should be given | |
| (*prefix* is preferred as it avoids potential collisions) | |
| Parameters | |
| ---------- | |
| prefix : str (optional) | |
| The prefix of the temporary subdirectory name for the workdir | |
| name : str (optional) | |
| The subdirectory name for the workdir | |
| """ | |
| try: | |
| self._purge_leftovers() | |
| except OSError: | |
| logger.error( | |
| "Failed to clean up lingering worker directories in path: %s ", | |
| exc_info=True, | |
| ) | |
| return WorkDir(self, **kwargs) | |