import asyncio from collections import defaultdict from threading import RLock from typing import Any from langflow.services.base import Service from langflow.services.cache.base import AsyncBaseCacheService, CacheService from langflow.services.deps import get_cache_service class ChatService(Service): """Service class for managing chat-related operations.""" name = "chat_service" def __init__(self) -> None: self.async_cache_locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock) self._sync_cache_locks: dict[str, RLock] = defaultdict(RLock) self.cache_service: CacheService | AsyncBaseCacheService = get_cache_service() async def set_cache(self, key: str, data: Any, lock: asyncio.Lock | None = None) -> bool: """Set the cache for a client. Args: key (str): The cache key. data (Any): The data to be cached. lock (Optional[asyncio.Lock], optional): The lock to use for the cache operation. Defaults to None. Returns: bool: True if the cache was set successfully, False otherwise. """ result_dict = { "result": data, "type": type(data), } if isinstance(self.cache_service, AsyncBaseCacheService): await self.cache_service.upsert(str(key), result_dict, lock=lock or self.async_cache_locks[key]) return await self.cache_service.contains(key) await asyncio.to_thread( self.cache_service.upsert, str(key), result_dict, lock=lock or self._sync_cache_locks[key] ) return key in self.cache_service async def get_cache(self, key: str, lock: asyncio.Lock | None = None) -> Any: """Get the cache for a client. Args: key (str): The cache key. lock (Optional[asyncio.Lock], optional): The lock to use for the cache operation. Defaults to None. Returns: Any: The cached data. """ if isinstance(self.cache_service, AsyncBaseCacheService): return await self.cache_service.get(key, lock=lock or self.async_cache_locks[key]) return await asyncio.to_thread(self.cache_service.get, key, lock=lock or self._sync_cache_locks[key]) async def clear_cache(self, key: str, lock: asyncio.Lock | None = None) -> None: """Clear the cache for a client. Args: key (str): The cache key. lock (Optional[asyncio.Lock], optional): The lock to use for the cache operation. Defaults to None. """ if isinstance(self.cache_service, AsyncBaseCacheService): return await self.cache_service.delete(key, lock=lock or self.async_cache_locks[key]) return await asyncio.to_thread(self.cache_service.delete, key, lock=lock or self._sync_cache_locks[key])