import asyncio import pickle import threading import time from collections import OrderedDict from typing import Generic, Union from loguru import logger from typing_extensions import override from langflow.services.cache.base import AsyncBaseCacheService, AsyncLockType, CacheService, LockType from langflow.services.cache.utils import CACHE_MISS class ThreadingInMemoryCache(CacheService, Generic[LockType]): """A simple in-memory cache using an OrderedDict. This cache supports setting a maximum size and expiration time for cached items. When the cache is full, it uses a Least Recently Used (LRU) eviction policy. Thread-safe using a threading Lock. Attributes: max_size (int, optional): Maximum number of items to store in the cache. expiration_time (int, optional): Time in seconds after which a cached item expires. Default is 1 hour. Example: cache = InMemoryCache(max_size=3, expiration_time=5) # setting cache values cache.set("a", 1) cache.set("b", 2) cache["c"] = 3 # getting cache values a = cache.get("a") b = cache["b"] """ def __init__(self, max_size=None, expiration_time=60 * 60) -> None: """Initialize a new InMemoryCache instance. Args: max_size (int, optional): Maximum number of items to store in the cache. expiration_time (int, optional): Time in seconds after which a cached item expires. Default is 1 hour. """ self._cache: OrderedDict = OrderedDict() self._lock = threading.RLock() self.max_size = max_size self.expiration_time = expiration_time def get(self, key, lock: Union[threading.Lock, None] = None): # noqa: UP007 """Retrieve an item from the cache. Args: key: The key of the item to retrieve. lock: A lock to use for the operation. Returns: The value associated with the key, or CACHE_MISS if the key is not found or the item has expired. """ with lock or self._lock: return self._get_without_lock(key) def _get_without_lock(self, key): """Retrieve an item from the cache without acquiring the lock.""" if item := self._cache.get(key): if self.expiration_time is None or time.time() - item["time"] < self.expiration_time: # Move the key to the end to make it recently used self._cache.move_to_end(key) # Check if the value is pickled return pickle.loads(item["value"]) if isinstance(item["value"], bytes) else item["value"] self.delete(key) return CACHE_MISS def set(self, key, value, lock: Union[threading.Lock, None] = None) -> None: # noqa: UP007 """Add an item to the cache. If the cache is full, the least recently used item is evicted. Args: key: The key of the item. value: The value to cache. lock: A lock to use for the operation. """ with lock or self._lock: if key in self._cache: # Remove existing key before re-inserting to update order self.delete(key) elif self.max_size and len(self._cache) >= self.max_size: # Remove least recently used item self._cache.popitem(last=False) # pickle locally to mimic Redis self._cache[key] = {"value": value, "time": time.time()} def upsert(self, key, value, lock: Union[threading.Lock, None] = None) -> None: # noqa: UP007 """Inserts or updates a value in the cache. If the existing value and the new value are both dictionaries, they are merged. Args: key: The key of the item. value: The value to insert or update. lock: A lock to use for the operation. """ with lock or self._lock: existing_value = self._get_without_lock(key) if existing_value is not CACHE_MISS and isinstance(existing_value, dict) and isinstance(value, dict): existing_value.update(value) value = existing_value self.set(key, value) def get_or_set(self, key, value, lock: Union[threading.Lock, None] = None): # noqa: UP007 """Retrieve an item from the cache. If the item does not exist, set it with the provided value. Args: key: The key of the item. value: The value to cache if the item doesn't exist. lock: A lock to use for the operation. Returns: The cached value associated with the key. """ with lock or self._lock: if key in self._cache: return self.get(key) self.set(key, value) return value def delete(self, key, lock: Union[threading.Lock, None] = None) -> None: # noqa: UP007 with lock or self._lock: self._cache.pop(key, None) def clear(self, lock: Union[threading.Lock, None] = None) -> None: # noqa: UP007 """Clear all items from the cache.""" with lock or self._lock: self._cache.clear() def contains(self, key) -> bool: """Check if the key is in the cache.""" return key in self._cache def __contains__(self, key) -> bool: """Check if the key is in the cache.""" return self.contains(key) def __getitem__(self, key): """Retrieve an item from the cache using the square bracket notation.""" return self.get(key) def __setitem__(self, key, value) -> None: """Add an item to the cache using the square bracket notation.""" self.set(key, value) def __delitem__(self, key) -> None: """Remove an item from the cache using the square bracket notation.""" self.delete(key) def __len__(self) -> int: """Return the number of items in the cache.""" return len(self._cache) def __repr__(self) -> str: """Return a string representation of the InMemoryCache instance.""" return f"InMemoryCache(max_size={self.max_size}, expiration_time={self.expiration_time})" class RedisCache(AsyncBaseCacheService, Generic[LockType]): """A Redis-based cache implementation. This cache supports setting an expiration time for cached items. Attributes: expiration_time (int, optional): Time in seconds after which a cached item expires. Default is 1 hour. Example: cache = RedisCache(expiration_time=5) # setting cache values cache.set("a", 1) cache.set("b", 2) cache["c"] = 3 # getting cache values a = cache.get("a") b = cache["b"] """ def __init__(self, host="localhost", port=6379, db=0, url=None, expiration_time=60 * 60) -> None: """Initialize a new RedisCache instance. Args: host (str, optional): Redis host. port (int, optional): Redis port. db (int, optional): Redis DB. url (str, optional): Redis URL. expiration_time (int, optional): Time in seconds after which a cached item expires. Default is 1 hour. """ try: from redis.asyncio import StrictRedis except ImportError as exc: msg = ( "RedisCache requires the redis-py package." " Please install Langflow with the deploy extra: pip install langflow[deploy]" ) raise ImportError(msg) from exc logger.warning( "RedisCache is an experimental feature and may not work as expected." " Please report any issues to our GitHub repository." ) if url: self._client = StrictRedis.from_url(url) else: self._client = StrictRedis(host=host, port=port, db=db) self.expiration_time = expiration_time # check connection def is_connected(self) -> bool: """Check if the Redis client is connected.""" import redis try: asyncio.run(self._client.ping()) except redis.exceptions.ConnectionError: logger.exception("RedisCache could not connect to the Redis server") return False return True @override async def get(self, key, lock=None): if key is None: return CACHE_MISS value = await self._client.get(str(key)) return pickle.loads(value) if value else CACHE_MISS @override async def set(self, key, value, lock=None) -> None: try: if pickled := pickle.dumps(value): result = await self._client.setex(str(key), self.expiration_time, pickled) if not result: msg = "RedisCache could not set the value." raise ValueError(msg) except TypeError as exc: msg = "RedisCache only accepts values that can be pickled. " raise TypeError(msg) from exc @override async def upsert(self, key, value, lock=None) -> None: """Inserts or updates a value in the cache. If the existing value and the new value are both dictionaries, they are merged. Args: key: The key of the item. value: The value to insert or update. lock: A lock to use for the operation. """ if key is None: return existing_value = await self.get(key) if existing_value is not None and isinstance(existing_value, dict) and isinstance(value, dict): existing_value.update(value) value = existing_value await self.set(key, value) @override async def delete(self, key, lock=None) -> None: await self._client.delete(key) @override async def clear(self, lock=None) -> None: """Clear all items from the cache.""" await self._client.flushdb() async def contains(self, key) -> bool: """Check if the key is in the cache.""" if key is None: return False return bool(await self._client.exists(str(key))) def __repr__(self) -> str: """Return a string representation of the RedisCache instance.""" return f"RedisCache(expiration_time={self.expiration_time})" class AsyncInMemoryCache(AsyncBaseCacheService, Generic[AsyncLockType]): def __init__(self, max_size=None, expiration_time=3600) -> None: self.cache: OrderedDict = OrderedDict() self.lock = asyncio.Lock() self.max_size = max_size self.expiration_time = expiration_time async def get(self, key, lock: asyncio.Lock | None = None): async with lock or self.lock: return await self._get(key) async def _get(self, key): item = self.cache.get(key, None) if item: if time.time() - item["time"] < self.expiration_time: self.cache.move_to_end(key) return pickle.loads(item["value"]) if isinstance(item["value"], bytes) else item["value"] logger.info(f"Cache item for key '{key}' has expired and will be deleted.") await self._delete(key) # Log before deleting the expired item return CACHE_MISS async def set(self, key, value, lock: asyncio.Lock | None = None) -> None: async with lock or self.lock: await self._set( key, value, ) async def _set(self, key, value) -> None: if self.max_size and len(self.cache) >= self.max_size: self.cache.popitem(last=False) self.cache[key] = {"value": value, "time": time.time()} self.cache.move_to_end(key) async def delete(self, key, lock: asyncio.Lock | None = None) -> None: async with lock or self.lock: await self._delete(key) async def _delete(self, key) -> None: if key in self.cache: del self.cache[key] async def clear(self, lock: asyncio.Lock | None = None) -> None: async with lock or self.lock: await self._clear() async def _clear(self) -> None: self.cache.clear() async def upsert(self, key, value, lock: asyncio.Lock | None = None) -> None: await self._upsert(key, value, lock) async def _upsert(self, key, value, lock: asyncio.Lock | None = None) -> None: existing_value = await self.get(key, lock) if existing_value is not None and isinstance(existing_value, dict) and isinstance(value, dict): existing_value.update(value) value = existing_value await self.set(key, value, lock) async def contains(self, key) -> bool: return key in self.cache