Spaces:
				
			
			
	
			
			
					
		Running
		
	
	
	
			
			
	
	
	
	
		
		
					
		Running
		
	| import asyncio | |
| import pickle | |
| import threading | |
| import time | |
| from collections import OrderedDict | |
| from typing import Generic, Union | |
| from loguru import logger | |
| from typing_extensions import override | |
| from langflow.services.cache.base import AsyncBaseCacheService, AsyncLockType, CacheService, LockType | |
| from langflow.services.cache.utils import CACHE_MISS | |
| class ThreadingInMemoryCache(CacheService, Generic[LockType]): | |
| """A simple in-memory cache using an OrderedDict. | |
| This cache supports setting a maximum size and expiration time for cached items. | |
| When the cache is full, it uses a Least Recently Used (LRU) eviction policy. | |
| Thread-safe using a threading Lock. | |
| Attributes: | |
| max_size (int, optional): Maximum number of items to store in the cache. | |
| expiration_time (int, optional): Time in seconds after which a cached item expires. Default is 1 hour. | |
| Example: | |
| cache = InMemoryCache(max_size=3, expiration_time=5) | |
| # setting cache values | |
| cache.set("a", 1) | |
| cache.set("b", 2) | |
| cache["c"] = 3 | |
| # getting cache values | |
| a = cache.get("a") | |
| b = cache["b"] | |
| """ | |
| def __init__(self, max_size=None, expiration_time=60 * 60) -> None: | |
| """Initialize a new InMemoryCache instance. | |
| Args: | |
| max_size (int, optional): Maximum number of items to store in the cache. | |
| expiration_time (int, optional): Time in seconds after which a cached item expires. Default is 1 hour. | |
| """ | |
| self._cache: OrderedDict = OrderedDict() | |
| self._lock = threading.RLock() | |
| self.max_size = max_size | |
| self.expiration_time = expiration_time | |
| def get(self, key, lock: Union[threading.Lock, None] = None): # noqa: UP007 | |
| """Retrieve an item from the cache. | |
| Args: | |
| key: The key of the item to retrieve. | |
| lock: A lock to use for the operation. | |
| Returns: | |
| The value associated with the key, or CACHE_MISS if the key is not found or the item has expired. | |
| """ | |
| with lock or self._lock: | |
| return self._get_without_lock(key) | |
| def _get_without_lock(self, key): | |
| """Retrieve an item from the cache without acquiring the lock.""" | |
| if item := self._cache.get(key): | |
| if self.expiration_time is None or time.time() - item["time"] < self.expiration_time: | |
| # Move the key to the end to make it recently used | |
| self._cache.move_to_end(key) | |
| # Check if the value is pickled | |
| return pickle.loads(item["value"]) if isinstance(item["value"], bytes) else item["value"] | |
| self.delete(key) | |
| return CACHE_MISS | |
| def set(self, key, value, lock: Union[threading.Lock, None] = None) -> None: # noqa: UP007 | |
| """Add an item to the cache. | |
| If the cache is full, the least recently used item is evicted. | |
| Args: | |
| key: The key of the item. | |
| value: The value to cache. | |
| lock: A lock to use for the operation. | |
| """ | |
| with lock or self._lock: | |
| if key in self._cache: | |
| # Remove existing key before re-inserting to update order | |
| self.delete(key) | |
| elif self.max_size and len(self._cache) >= self.max_size: | |
| # Remove least recently used item | |
| self._cache.popitem(last=False) | |
| # pickle locally to mimic Redis | |
| self._cache[key] = {"value": value, "time": time.time()} | |
| def upsert(self, key, value, lock: Union[threading.Lock, None] = None) -> None: # noqa: UP007 | |
| """Inserts or updates a value in the cache. | |
| If the existing value and the new value are both dictionaries, they are merged. | |
| Args: | |
| key: The key of the item. | |
| value: The value to insert or update. | |
| lock: A lock to use for the operation. | |
| """ | |
| with lock or self._lock: | |
| existing_value = self._get_without_lock(key) | |
| if existing_value is not CACHE_MISS and isinstance(existing_value, dict) and isinstance(value, dict): | |
| existing_value.update(value) | |
| value = existing_value | |
| self.set(key, value) | |
| def get_or_set(self, key, value, lock: Union[threading.Lock, None] = None): # noqa: UP007 | |
| """Retrieve an item from the cache. | |
| If the item does not exist, set it with the provided value. | |
| Args: | |
| key: The key of the item. | |
| value: The value to cache if the item doesn't exist. | |
| lock: A lock to use for the operation. | |
| Returns: | |
| The cached value associated with the key. | |
| """ | |
| with lock or self._lock: | |
| if key in self._cache: | |
| return self.get(key) | |
| self.set(key, value) | |
| return value | |
| def delete(self, key, lock: Union[threading.Lock, None] = None) -> None: # noqa: UP007 | |
| with lock or self._lock: | |
| self._cache.pop(key, None) | |
| def clear(self, lock: Union[threading.Lock, None] = None) -> None: # noqa: UP007 | |
| """Clear all items from the cache.""" | |
| with lock or self._lock: | |
| self._cache.clear() | |
| def contains(self, key) -> bool: | |
| """Check if the key is in the cache.""" | |
| return key in self._cache | |
| def __contains__(self, key) -> bool: | |
| """Check if the key is in the cache.""" | |
| return self.contains(key) | |
| def __getitem__(self, key): | |
| """Retrieve an item from the cache using the square bracket notation.""" | |
| return self.get(key) | |
| def __setitem__(self, key, value) -> None: | |
| """Add an item to the cache using the square bracket notation.""" | |
| self.set(key, value) | |
| def __delitem__(self, key) -> None: | |
| """Remove an item from the cache using the square bracket notation.""" | |
| self.delete(key) | |
| def __len__(self) -> int: | |
| """Return the number of items in the cache.""" | |
| return len(self._cache) | |
| def __repr__(self) -> str: | |
| """Return a string representation of the InMemoryCache instance.""" | |
| return f"InMemoryCache(max_size={self.max_size}, expiration_time={self.expiration_time})" | |
| class RedisCache(AsyncBaseCacheService, Generic[LockType]): | |
| """A Redis-based cache implementation. | |
| This cache supports setting an expiration time for cached items. | |
| Attributes: | |
| expiration_time (int, optional): Time in seconds after which a cached item expires. Default is 1 hour. | |
| Example: | |
| cache = RedisCache(expiration_time=5) | |
| # setting cache values | |
| cache.set("a", 1) | |
| cache.set("b", 2) | |
| cache["c"] = 3 | |
| # getting cache values | |
| a = cache.get("a") | |
| b = cache["b"] | |
| """ | |
| def __init__(self, host="localhost", port=6379, db=0, url=None, expiration_time=60 * 60) -> None: | |
| """Initialize a new RedisCache instance. | |
| Args: | |
| host (str, optional): Redis host. | |
| port (int, optional): Redis port. | |
| db (int, optional): Redis DB. | |
| url (str, optional): Redis URL. | |
| expiration_time (int, optional): Time in seconds after which a | |
| cached item expires. Default is 1 hour. | |
| """ | |
| try: | |
| from redis.asyncio import StrictRedis | |
| except ImportError as exc: | |
| msg = ( | |
| "RedisCache requires the redis-py package." | |
| " Please install Langflow with the deploy extra: pip install langflow[deploy]" | |
| ) | |
| raise ImportError(msg) from exc | |
| logger.warning( | |
| "RedisCache is an experimental feature and may not work as expected." | |
| " Please report any issues to our GitHub repository." | |
| ) | |
| if url: | |
| self._client = StrictRedis.from_url(url) | |
| else: | |
| self._client = StrictRedis(host=host, port=port, db=db) | |
| self.expiration_time = expiration_time | |
| # check connection | |
| def is_connected(self) -> bool: | |
| """Check if the Redis client is connected.""" | |
| import redis | |
| try: | |
| asyncio.run(self._client.ping()) | |
| except redis.exceptions.ConnectionError: | |
| logger.exception("RedisCache could not connect to the Redis server") | |
| return False | |
| return True | |
| async def get(self, key, lock=None): | |
| if key is None: | |
| return CACHE_MISS | |
| value = await self._client.get(str(key)) | |
| return pickle.loads(value) if value else CACHE_MISS | |
| async def set(self, key, value, lock=None) -> None: | |
| try: | |
| if pickled := pickle.dumps(value): | |
| result = await self._client.setex(str(key), self.expiration_time, pickled) | |
| if not result: | |
| msg = "RedisCache could not set the value." | |
| raise ValueError(msg) | |
| except TypeError as exc: | |
| msg = "RedisCache only accepts values that can be pickled. " | |
| raise TypeError(msg) from exc | |
| async def upsert(self, key, value, lock=None) -> None: | |
| """Inserts or updates a value in the cache. | |
| If the existing value and the new value are both dictionaries, they are merged. | |
| Args: | |
| key: The key of the item. | |
| value: The value to insert or update. | |
| lock: A lock to use for the operation. | |
| """ | |
| if key is None: | |
| return | |
| existing_value = await self.get(key) | |
| if existing_value is not None and isinstance(existing_value, dict) and isinstance(value, dict): | |
| existing_value.update(value) | |
| value = existing_value | |
| await self.set(key, value) | |
| async def delete(self, key, lock=None) -> None: | |
| await self._client.delete(key) | |
| async def clear(self, lock=None) -> None: | |
| """Clear all items from the cache.""" | |
| await self._client.flushdb() | |
| async def contains(self, key) -> bool: | |
| """Check if the key is in the cache.""" | |
| if key is None: | |
| return False | |
| return bool(await self._client.exists(str(key))) | |
| def __repr__(self) -> str: | |
| """Return a string representation of the RedisCache instance.""" | |
| return f"RedisCache(expiration_time={self.expiration_time})" | |
| class AsyncInMemoryCache(AsyncBaseCacheService, Generic[AsyncLockType]): | |
| def __init__(self, max_size=None, expiration_time=3600) -> None: | |
| self.cache: OrderedDict = OrderedDict() | |
| self.lock = asyncio.Lock() | |
| self.max_size = max_size | |
| self.expiration_time = expiration_time | |
| async def get(self, key, lock: asyncio.Lock | None = None): | |
| async with lock or self.lock: | |
| return await self._get(key) | |
| async def _get(self, key): | |
| item = self.cache.get(key, None) | |
| if item: | |
| if time.time() - item["time"] < self.expiration_time: | |
| self.cache.move_to_end(key) | |
| return pickle.loads(item["value"]) if isinstance(item["value"], bytes) else item["value"] | |
| logger.info(f"Cache item for key '{key}' has expired and will be deleted.") | |
| await self._delete(key) # Log before deleting the expired item | |
| return CACHE_MISS | |
| async def set(self, key, value, lock: asyncio.Lock | None = None) -> None: | |
| async with lock or self.lock: | |
| await self._set( | |
| key, | |
| value, | |
| ) | |
| async def _set(self, key, value) -> None: | |
| if self.max_size and len(self.cache) >= self.max_size: | |
| self.cache.popitem(last=False) | |
| self.cache[key] = {"value": value, "time": time.time()} | |
| self.cache.move_to_end(key) | |
| async def delete(self, key, lock: asyncio.Lock | None = None) -> None: | |
| async with lock or self.lock: | |
| await self._delete(key) | |
| async def _delete(self, key) -> None: | |
| if key in self.cache: | |
| del self.cache[key] | |
| async def clear(self, lock: asyncio.Lock | None = None) -> None: | |
| async with lock or self.lock: | |
| await self._clear() | |
| async def _clear(self) -> None: | |
| self.cache.clear() | |
| async def upsert(self, key, value, lock: asyncio.Lock | None = None) -> None: | |
| await self._upsert(key, value, lock) | |
| async def _upsert(self, key, value, lock: asyncio.Lock | None = None) -> None: | |
| existing_value = await self.get(key, lock) | |
| if existing_value is not None and isinstance(existing_value, dict) and isinstance(value, dict): | |
| existing_value.update(value) | |
| value = existing_value | |
| await self.set(key, value, lock) | |
| async def contains(self, key) -> bool: | |
| return key in self.cache | |
