Tai Truong
fix readme
d202ada
import re
import threading
from contextlib import contextmanager
from pathlib import Path
from filelock import FileLock
from platformdirs import user_cache_dir
class KeyedMemoryLockManager:
"""A manager for acquiring and releasing memory locks based on a key."""
def __init__(self) -> None:
self.locks: dict[str, threading.Lock] = {}
self.global_lock = threading.Lock()
def _get_lock(self, key: str):
with self.global_lock:
if key not in self.locks:
self.locks[key] = threading.Lock()
return self.locks[key]
@contextmanager
def lock(self, key: str):
lock = self._get_lock(key)
lock.acquire()
try:
yield
finally:
lock.release()
class KeyedWorkerLockManager:
"""A manager for acquiring locks between workers based on a key."""
def __init__(self) -> None:
self.locks_dir = Path(user_cache_dir("langflow"), ensure_exists=True) / "worker_locks"
def _validate_key(self, key: str) -> bool:
"""Validate that the string only contains alphanumeric characters and underscores.
Parameters:
s (str): The string to validate.
Returns:
bool: True if the string is valid, False otherwise.
"""
pattern = re.compile(r"^\w+$")
return bool(pattern.match(key))
@contextmanager
def lock(self, key: str):
if not self._validate_key(key):
msg = f"Invalid key: {key}"
raise ValueError(msg)
lock = FileLock(self.locks_dir / key)
with lock:
yield