Spaces:
Running
Running
import re | |
import threading | |
from contextlib import contextmanager | |
from pathlib import Path | |
from filelock import FileLock | |
from platformdirs import user_cache_dir | |
class KeyedMemoryLockManager: | |
"""A manager for acquiring and releasing memory locks based on a key.""" | |
def __init__(self) -> None: | |
self.locks: dict[str, threading.Lock] = {} | |
self.global_lock = threading.Lock() | |
def _get_lock(self, key: str): | |
with self.global_lock: | |
if key not in self.locks: | |
self.locks[key] = threading.Lock() | |
return self.locks[key] | |
def lock(self, key: str): | |
lock = self._get_lock(key) | |
lock.acquire() | |
try: | |
yield | |
finally: | |
lock.release() | |
class KeyedWorkerLockManager: | |
"""A manager for acquiring locks between workers based on a key.""" | |
def __init__(self) -> None: | |
self.locks_dir = Path(user_cache_dir("langflow"), ensure_exists=True) / "worker_locks" | |
def _validate_key(self, key: str) -> bool: | |
"""Validate that the string only contains alphanumeric characters and underscores. | |
Parameters: | |
s (str): The string to validate. | |
Returns: | |
bool: True if the string is valid, False otherwise. | |
""" | |
pattern = re.compile(r"^\w+$") | |
return bool(pattern.match(key)) | |
def lock(self, key: str): | |
if not self._validate_key(key): | |
msg = f"Invalid key: {key}" | |
raise ValueError(msg) | |
lock = FileLock(self.locks_dir / key) | |
with lock: | |
yield | |