Spaces:
Running
Running
import asyncio | |
import pickle | |
import time | |
from typing import Generic | |
from diskcache import Cache | |
from loguru import logger | |
from langflow.services.cache.base import AsyncBaseCacheService, AsyncLockType | |
from langflow.services.cache.utils import CACHE_MISS | |
class AsyncDiskCache(AsyncBaseCacheService, Generic[AsyncLockType]): | |
def __init__(self, cache_dir, max_size=None, expiration_time=3600) -> None: | |
self.cache = Cache(cache_dir) | |
# Let's clear the cache for now to maintain a similar | |
# behavior as the in-memory cache | |
# Later we should implement endpoints for the frontend to grab | |
# output logs from the cache | |
if len(self.cache) > 0: | |
self.cache.clear() | |
self.lock = asyncio.Lock() | |
self.max_size = max_size | |
self.expiration_time = expiration_time | |
async def get(self, key, lock: asyncio.Lock | None = None): | |
if not lock: | |
async with self.lock: | |
return await asyncio.to_thread(self._get, key) | |
else: | |
return await asyncio.to_thread(self._get, key) | |
def _get(self, key): | |
item = self.cache.get(key, default=None) | |
if item: | |
if time.time() - item["time"] < self.expiration_time: | |
self.cache.touch(key) # Refresh the expiry time | |
return pickle.loads(item["value"]) if isinstance(item["value"], bytes) else item["value"] | |
logger.info(f"Cache item for key '{key}' has expired and will be deleted.") | |
self.cache.delete(key) # Log before deleting the expired item | |
return CACHE_MISS | |
async def set(self, key, value, lock: asyncio.Lock | None = None) -> None: | |
if not lock: | |
async with self.lock: | |
await self._set(key, value) | |
else: | |
await self._set(key, value) | |
async def _set(self, key, value) -> None: | |
if self.max_size and len(self.cache) >= self.max_size: | |
await asyncio.to_thread(self.cache.cull) | |
item = {"value": pickle.dumps(value) if not isinstance(value, str | bytes) else value, "time": time.time()} | |
await asyncio.to_thread(self.cache.set, key, item) | |
async def delete(self, key, lock: asyncio.Lock | None = None) -> None: | |
if not lock: | |
async with self.lock: | |
await self._delete(key) | |
else: | |
await self._delete(key) | |
async def _delete(self, key) -> None: | |
await asyncio.to_thread(self.cache.delete, key) | |
async def clear(self, lock: asyncio.Lock | None = None) -> None: | |
if not lock: | |
async with self.lock: | |
await self._clear() | |
else: | |
await self._clear() | |
async def _clear(self) -> None: | |
await asyncio.to_thread(self.cache.clear) | |
async def upsert(self, key, value, lock: asyncio.Lock | None = None) -> None: | |
if not lock: | |
async with self.lock: | |
await self._upsert(key, value) | |
else: | |
await self._upsert(key, value) | |
async def _upsert(self, key, value) -> None: | |
existing_value = await asyncio.to_thread(self._get, key) | |
if existing_value is not CACHE_MISS and isinstance(existing_value, dict) and isinstance(value, dict): | |
existing_value.update(value) | |
value = existing_value | |
await self.set(key, value) | |
async def contains(self, key) -> bool: | |
return await asyncio.to_thread(self.cache.__contains__, key) | |
async def teardown(self) -> None: | |
# Clean up the cache directory | |
self.cache.clear(retry=True) | |