Tai Truong
fix readme
d202ada
raw
history blame
12.8 kB
import asyncio
import pickle
import threading
import time
from collections import OrderedDict
from typing import Generic, Union
from loguru import logger
from typing_extensions import override
from langflow.services.cache.base import AsyncBaseCacheService, AsyncLockType, CacheService, LockType
from langflow.services.cache.utils import CACHE_MISS
class ThreadingInMemoryCache(CacheService, Generic[LockType]):
"""A simple in-memory cache using an OrderedDict.
This cache supports setting a maximum size and expiration time for cached items.
When the cache is full, it uses a Least Recently Used (LRU) eviction policy.
Thread-safe using a threading Lock.
Attributes:
max_size (int, optional): Maximum number of items to store in the cache.
expiration_time (int, optional): Time in seconds after which a cached item expires. Default is 1 hour.
Example:
cache = InMemoryCache(max_size=3, expiration_time=5)
# setting cache values
cache.set("a", 1)
cache.set("b", 2)
cache["c"] = 3
# getting cache values
a = cache.get("a")
b = cache["b"]
"""
def __init__(self, max_size=None, expiration_time=60 * 60) -> None:
"""Initialize a new InMemoryCache instance.
Args:
max_size (int, optional): Maximum number of items to store in the cache.
expiration_time (int, optional): Time in seconds after which a cached item expires. Default is 1 hour.
"""
self._cache: OrderedDict = OrderedDict()
self._lock = threading.RLock()
self.max_size = max_size
self.expiration_time = expiration_time
def get(self, key, lock: Union[threading.Lock, None] = None): # noqa: UP007
"""Retrieve an item from the cache.
Args:
key: The key of the item to retrieve.
lock: A lock to use for the operation.
Returns:
The value associated with the key, or CACHE_MISS if the key is not found or the item has expired.
"""
with lock or self._lock:
return self._get_without_lock(key)
def _get_without_lock(self, key):
"""Retrieve an item from the cache without acquiring the lock."""
if item := self._cache.get(key):
if self.expiration_time is None or time.time() - item["time"] < self.expiration_time:
# Move the key to the end to make it recently used
self._cache.move_to_end(key)
# Check if the value is pickled
return pickle.loads(item["value"]) if isinstance(item["value"], bytes) else item["value"]
self.delete(key)
return CACHE_MISS
def set(self, key, value, lock: Union[threading.Lock, None] = None) -> None: # noqa: UP007
"""Add an item to the cache.
If the cache is full, the least recently used item is evicted.
Args:
key: The key of the item.
value: The value to cache.
lock: A lock to use for the operation.
"""
with lock or self._lock:
if key in self._cache:
# Remove existing key before re-inserting to update order
self.delete(key)
elif self.max_size and len(self._cache) >= self.max_size:
# Remove least recently used item
self._cache.popitem(last=False)
# pickle locally to mimic Redis
self._cache[key] = {"value": value, "time": time.time()}
def upsert(self, key, value, lock: Union[threading.Lock, None] = None) -> None: # noqa: UP007
"""Inserts or updates a value in the cache.
If the existing value and the new value are both dictionaries, they are merged.
Args:
key: The key of the item.
value: The value to insert or update.
lock: A lock to use for the operation.
"""
with lock or self._lock:
existing_value = self._get_without_lock(key)
if existing_value is not CACHE_MISS and isinstance(existing_value, dict) and isinstance(value, dict):
existing_value.update(value)
value = existing_value
self.set(key, value)
def get_or_set(self, key, value, lock: Union[threading.Lock, None] = None): # noqa: UP007
"""Retrieve an item from the cache.
If the item does not exist, set it with the provided value.
Args:
key: The key of the item.
value: The value to cache if the item doesn't exist.
lock: A lock to use for the operation.
Returns:
The cached value associated with the key.
"""
with lock or self._lock:
if key in self._cache:
return self.get(key)
self.set(key, value)
return value
def delete(self, key, lock: Union[threading.Lock, None] = None) -> None: # noqa: UP007
with lock or self._lock:
self._cache.pop(key, None)
def clear(self, lock: Union[threading.Lock, None] = None) -> None: # noqa: UP007
"""Clear all items from the cache."""
with lock or self._lock:
self._cache.clear()
def contains(self, key) -> bool:
"""Check if the key is in the cache."""
return key in self._cache
def __contains__(self, key) -> bool:
"""Check if the key is in the cache."""
return self.contains(key)
def __getitem__(self, key):
"""Retrieve an item from the cache using the square bracket notation."""
return self.get(key)
def __setitem__(self, key, value) -> None:
"""Add an item to the cache using the square bracket notation."""
self.set(key, value)
def __delitem__(self, key) -> None:
"""Remove an item from the cache using the square bracket notation."""
self.delete(key)
def __len__(self) -> int:
"""Return the number of items in the cache."""
return len(self._cache)
def __repr__(self) -> str:
"""Return a string representation of the InMemoryCache instance."""
return f"InMemoryCache(max_size={self.max_size}, expiration_time={self.expiration_time})"
class RedisCache(AsyncBaseCacheService, Generic[LockType]):
"""A Redis-based cache implementation.
This cache supports setting an expiration time for cached items.
Attributes:
expiration_time (int, optional): Time in seconds after which a cached item expires. Default is 1 hour.
Example:
cache = RedisCache(expiration_time=5)
# setting cache values
cache.set("a", 1)
cache.set("b", 2)
cache["c"] = 3
# getting cache values
a = cache.get("a")
b = cache["b"]
"""
def __init__(self, host="localhost", port=6379, db=0, url=None, expiration_time=60 * 60) -> None:
"""Initialize a new RedisCache instance.
Args:
host (str, optional): Redis host.
port (int, optional): Redis port.
db (int, optional): Redis DB.
url (str, optional): Redis URL.
expiration_time (int, optional): Time in seconds after which a
cached item expires. Default is 1 hour.
"""
try:
from redis.asyncio import StrictRedis
except ImportError as exc:
msg = (
"RedisCache requires the redis-py package."
" Please install Langflow with the deploy extra: pip install langflow[deploy]"
)
raise ImportError(msg) from exc
logger.warning(
"RedisCache is an experimental feature and may not work as expected."
" Please report any issues to our GitHub repository."
)
if url:
self._client = StrictRedis.from_url(url)
else:
self._client = StrictRedis(host=host, port=port, db=db)
self.expiration_time = expiration_time
# check connection
def is_connected(self) -> bool:
"""Check if the Redis client is connected."""
import redis
try:
asyncio.run(self._client.ping())
except redis.exceptions.ConnectionError:
logger.exception("RedisCache could not connect to the Redis server")
return False
return True
@override
async def get(self, key, lock=None):
if key is None:
return CACHE_MISS
value = await self._client.get(str(key))
return pickle.loads(value) if value else CACHE_MISS
@override
async def set(self, key, value, lock=None) -> None:
try:
if pickled := pickle.dumps(value):
result = await self._client.setex(str(key), self.expiration_time, pickled)
if not result:
msg = "RedisCache could not set the value."
raise ValueError(msg)
except TypeError as exc:
msg = "RedisCache only accepts values that can be pickled. "
raise TypeError(msg) from exc
@override
async def upsert(self, key, value, lock=None) -> None:
"""Inserts or updates a value in the cache.
If the existing value and the new value are both dictionaries, they are merged.
Args:
key: The key of the item.
value: The value to insert or update.
lock: A lock to use for the operation.
"""
if key is None:
return
existing_value = await self.get(key)
if existing_value is not None and isinstance(existing_value, dict) and isinstance(value, dict):
existing_value.update(value)
value = existing_value
await self.set(key, value)
@override
async def delete(self, key, lock=None) -> None:
await self._client.delete(key)
@override
async def clear(self, lock=None) -> None:
"""Clear all items from the cache."""
await self._client.flushdb()
async def contains(self, key) -> bool:
"""Check if the key is in the cache."""
if key is None:
return False
return bool(await self._client.exists(str(key)))
def __repr__(self) -> str:
"""Return a string representation of the RedisCache instance."""
return f"RedisCache(expiration_time={self.expiration_time})"
class AsyncInMemoryCache(AsyncBaseCacheService, Generic[AsyncLockType]):
def __init__(self, max_size=None, expiration_time=3600) -> None:
self.cache: OrderedDict = OrderedDict()
self.lock = asyncio.Lock()
self.max_size = max_size
self.expiration_time = expiration_time
async def get(self, key, lock: asyncio.Lock | None = None):
async with lock or self.lock:
return await self._get(key)
async def _get(self, key):
item = self.cache.get(key, None)
if item:
if time.time() - item["time"] < self.expiration_time:
self.cache.move_to_end(key)
return pickle.loads(item["value"]) if isinstance(item["value"], bytes) else item["value"]
logger.info(f"Cache item for key '{key}' has expired and will be deleted.")
await self._delete(key) # Log before deleting the expired item
return CACHE_MISS
async def set(self, key, value, lock: asyncio.Lock | None = None) -> None:
async with lock or self.lock:
await self._set(
key,
value,
)
async def _set(self, key, value) -> None:
if self.max_size and len(self.cache) >= self.max_size:
self.cache.popitem(last=False)
self.cache[key] = {"value": value, "time": time.time()}
self.cache.move_to_end(key)
async def delete(self, key, lock: asyncio.Lock | None = None) -> None:
async with lock or self.lock:
await self._delete(key)
async def _delete(self, key) -> None:
if key in self.cache:
del self.cache[key]
async def clear(self, lock: asyncio.Lock | None = None) -> None:
async with lock or self.lock:
await self._clear()
async def _clear(self) -> None:
self.cache.clear()
async def upsert(self, key, value, lock: asyncio.Lock | None = None) -> None:
await self._upsert(key, value, lock)
async def _upsert(self, key, value, lock: asyncio.Lock | None = None) -> None:
existing_value = await self.get(key, lock)
if existing_value is not None and isinstance(existing_value, dict) and isinstance(value, dict):
existing_value.update(value)
value = existing_value
await self.set(key, value, lock)
async def contains(self, key) -> bool:
return key in self.cache