import asyncio import pickle import time from typing import Generic from diskcache import Cache from loguru import logger from langflow.services.cache.base import AsyncBaseCacheService, AsyncLockType from langflow.services.cache.utils import CACHE_MISS class AsyncDiskCache(AsyncBaseCacheService, Generic[AsyncLockType]): def __init__(self, cache_dir, max_size=None, expiration_time=3600) -> None: self.cache = Cache(cache_dir) # Let's clear the cache for now to maintain a similar # behavior as the in-memory cache # Later we should implement endpoints for the frontend to grab # output logs from the cache if len(self.cache) > 0: self.cache.clear() self.lock = asyncio.Lock() self.max_size = max_size self.expiration_time = expiration_time async def get(self, key, lock: asyncio.Lock | None = None): if not lock: async with self.lock: return await asyncio.to_thread(self._get, key) else: return await asyncio.to_thread(self._get, key) def _get(self, key): item = self.cache.get(key, default=None) if item: if time.time() - item["time"] < self.expiration_time: self.cache.touch(key) # Refresh the expiry time return pickle.loads(item["value"]) if isinstance(item["value"], bytes) else item["value"] logger.info(f"Cache item for key '{key}' has expired and will be deleted.") self.cache.delete(key) # Log before deleting the expired item return CACHE_MISS async def set(self, key, value, lock: asyncio.Lock | None = None) -> None: if not lock: async with self.lock: await self._set(key, value) else: await self._set(key, value) async def _set(self, key, value) -> None: if self.max_size and len(self.cache) >= self.max_size: await asyncio.to_thread(self.cache.cull) item = {"value": pickle.dumps(value) if not isinstance(value, str | bytes) else value, "time": time.time()} await asyncio.to_thread(self.cache.set, key, item) async def delete(self, key, lock: asyncio.Lock | None = None) -> None: if not lock: async with self.lock: await self._delete(key) else: await self._delete(key) async def _delete(self, key) -> None: await asyncio.to_thread(self.cache.delete, key) async def clear(self, lock: asyncio.Lock | None = None) -> None: if not lock: async with self.lock: await self._clear() else: await self._clear() async def _clear(self) -> None: await asyncio.to_thread(self.cache.clear) async def upsert(self, key, value, lock: asyncio.Lock | None = None) -> None: if not lock: async with self.lock: await self._upsert(key, value) else: await self._upsert(key, value) async def _upsert(self, key, value) -> None: existing_value = await asyncio.to_thread(self._get, key) if existing_value is not CACHE_MISS and isinstance(existing_value, dict) and isinstance(value, dict): existing_value.update(value) value = existing_value await self.set(key, value) async def contains(self, key) -> bool: return await asyncio.to_thread(self.cache.__contains__, key) async def teardown(self) -> None: # Clean up the cache directory self.cache.clear(retry=True)