Spaces:
Running
Running
import abc | |
import asyncio | |
import threading | |
from typing import Generic, TypeVar | |
from langflow.services.base import Service | |
LockType = TypeVar("LockType", bound=threading.Lock) | |
AsyncLockType = TypeVar("AsyncLockType", bound=asyncio.Lock) | |
class CacheService(Service, Generic[LockType]): | |
"""Abstract base class for a cache.""" | |
name = "cache_service" | |
def get(self, key, lock: LockType | None = None): | |
"""Retrieve an item from the cache. | |
Args: | |
key: The key of the item to retrieve. | |
lock: A lock to use for the operation. | |
Returns: | |
The value associated with the key, or CACHE_MISS if the key is not found. | |
""" | |
def set(self, key, value, lock: LockType | None = None): | |
"""Add an item to the cache. | |
Args: | |
key: The key of the item. | |
value: The value to cache. | |
lock: A lock to use for the operation. | |
""" | |
def upsert(self, key, value, lock: LockType | None = None): | |
"""Add an item to the cache if it doesn't exist, or update it if it does. | |
Args: | |
key: The key of the item. | |
value: The value to cache. | |
lock: A lock to use for the operation. | |
""" | |
def delete(self, key, lock: LockType | None = None): | |
"""Remove an item from the cache. | |
Args: | |
key: The key of the item to remove. | |
lock: A lock to use for the operation. | |
""" | |
def clear(self, lock: LockType | None = None): | |
"""Clear all items from the cache.""" | |
def contains(self, key) -> bool: | |
"""Check if the key is in the cache. | |
Args: | |
key: The key of the item to check. | |
Returns: | |
True if the key is in the cache, False otherwise. | |
""" | |
def __contains__(self, key) -> bool: | |
"""Check if the key is in the cache. | |
Args: | |
key: The key of the item to check. | |
Returns: | |
True if the key is in the cache, False otherwise. | |
""" | |
def __getitem__(self, key): | |
"""Retrieve an item from the cache using the square bracket notation. | |
Args: | |
key: The key of the item to retrieve. | |
""" | |
def __setitem__(self, key, value) -> None: | |
"""Add an item to the cache using the square bracket notation. | |
Args: | |
key: The key of the item. | |
value: The value to cache. | |
""" | |
def __delitem__(self, key) -> None: | |
"""Remove an item from the cache using the square bracket notation. | |
Args: | |
key: The key of the item to remove. | |
""" | |
class AsyncBaseCacheService(Service, Generic[AsyncLockType]): | |
"""Abstract base class for a async cache.""" | |
name = "cache_service" | |
async def get(self, key, lock: AsyncLockType | None = None): | |
"""Retrieve an item from the cache. | |
Args: | |
key: The key of the item to retrieve. | |
lock: A lock to use for the operation. | |
Returns: | |
The value associated with the key, or CACHE_MISS if the key is not found. | |
""" | |
async def set(self, key, value, lock: AsyncLockType | None = None): | |
"""Add an item to the cache. | |
Args: | |
key: The key of the item. | |
value: The value to cache. | |
lock: A lock to use for the operation. | |
""" | |
async def upsert(self, key, value, lock: AsyncLockType | None = None): | |
"""Add an item to the cache if it doesn't exist, or update it if it does. | |
Args: | |
key: The key of the item. | |
value: The value to cache. | |
lock: A lock to use for the operation. | |
""" | |
async def delete(self, key, lock: AsyncLockType | None = None): | |
"""Remove an item from the cache. | |
Args: | |
key: The key of the item to remove. | |
lock: A lock to use for the operation. | |
""" | |
async def clear(self, lock: AsyncLockType | None = None): | |
"""Clear all items from the cache.""" | |
async def contains(self, key) -> bool: | |
"""Check if the key is in the cache. | |
Args: | |
key: The key of the item to check. | |
Returns: | |
True if the key is in the cache, False otherwise. | |
""" | |