Tai Truong
fix readme
d202ada
import asyncio
import pickle
import time
from typing import Generic
from diskcache import Cache
from loguru import logger
from langflow.services.cache.base import AsyncBaseCacheService, AsyncLockType
from langflow.services.cache.utils import CACHE_MISS
class AsyncDiskCache(AsyncBaseCacheService, Generic[AsyncLockType]):
def __init__(self, cache_dir, max_size=None, expiration_time=3600) -> None:
self.cache = Cache(cache_dir)
# Let's clear the cache for now to maintain a similar
# behavior as the in-memory cache
# Later we should implement endpoints for the frontend to grab
# output logs from the cache
if len(self.cache) > 0:
self.cache.clear()
self.lock = asyncio.Lock()
self.max_size = max_size
self.expiration_time = expiration_time
async def get(self, key, lock: asyncio.Lock | None = None):
if not lock:
async with self.lock:
return await asyncio.to_thread(self._get, key)
else:
return await asyncio.to_thread(self._get, key)
def _get(self, key):
item = self.cache.get(key, default=None)
if item:
if time.time() - item["time"] < self.expiration_time:
self.cache.touch(key) # Refresh the expiry time
return pickle.loads(item["value"]) if isinstance(item["value"], bytes) else item["value"]
logger.info(f"Cache item for key '{key}' has expired and will be deleted.")
self.cache.delete(key) # Log before deleting the expired item
return CACHE_MISS
async def set(self, key, value, lock: asyncio.Lock | None = None) -> None:
if not lock:
async with self.lock:
await self._set(key, value)
else:
await self._set(key, value)
async def _set(self, key, value) -> None:
if self.max_size and len(self.cache) >= self.max_size:
await asyncio.to_thread(self.cache.cull)
item = {"value": pickle.dumps(value) if not isinstance(value, str | bytes) else value, "time": time.time()}
await asyncio.to_thread(self.cache.set, key, item)
async def delete(self, key, lock: asyncio.Lock | None = None) -> None:
if not lock:
async with self.lock:
await self._delete(key)
else:
await self._delete(key)
async def _delete(self, key) -> None:
await asyncio.to_thread(self.cache.delete, key)
async def clear(self, lock: asyncio.Lock | None = None) -> None:
if not lock:
async with self.lock:
await self._clear()
else:
await self._clear()
async def _clear(self) -> None:
await asyncio.to_thread(self.cache.clear)
async def upsert(self, key, value, lock: asyncio.Lock | None = None) -> None:
if not lock:
async with self.lock:
await self._upsert(key, value)
else:
await self._upsert(key, value)
async def _upsert(self, key, value) -> None:
existing_value = await asyncio.to_thread(self._get, key)
if existing_value is not CACHE_MISS and isinstance(existing_value, dict) and isinstance(value, dict):
existing_value.update(value)
value = existing_value
await self.set(key, value)
async def contains(self, key) -> bool:
return await asyncio.to_thread(self.cache.__contains__, key)
async def teardown(self) -> None:
# Clean up the cache directory
self.cache.clear(retry=True)