Spaces:
Running
Running
import asyncio | |
from typing import TYPE_CHECKING | |
from langflow.services.base import Service | |
from langflow.services.cache.base import AsyncBaseCacheService | |
from langflow.services.cache.utils import CacheMiss | |
from langflow.services.session.utils import compute_dict_hash, session_id_generator | |
if TYPE_CHECKING: | |
from langflow.services.cache.base import CacheService | |
class SessionService(Service): | |
name = "session_service" | |
def __init__(self, cache_service) -> None: | |
self.cache_service: CacheService | AsyncBaseCacheService = cache_service | |
async def load_session(self, key, flow_id: str, data_graph: dict | None = None): | |
# Check if the data is cached | |
if isinstance(self.cache_service, AsyncBaseCacheService): | |
value = await self.cache_service.get(key) | |
else: | |
value = await asyncio.to_thread(self.cache_service.get, key) | |
if not isinstance(value, CacheMiss): | |
return value | |
if key is None: | |
key = self.generate_key(session_id=None, data_graph=data_graph) | |
if data_graph is None: | |
return None, None | |
# If not cached, build the graph and cache it | |
from langflow.graph.graph.base import Graph | |
graph = Graph.from_payload(data_graph, flow_id=flow_id) | |
artifacts: dict = {} | |
await self.cache_service.set(key, (graph, artifacts)) | |
return graph, artifacts | |
def build_key(self, session_id, data_graph) -> str: | |
json_hash = compute_dict_hash(data_graph) | |
return f"{session_id}{':' if session_id else ''}{json_hash}" | |
def generate_key(self, session_id, data_graph): | |
# Hash the JSON and combine it with the session_id to create a unique key | |
if session_id is None: | |
# generate a 5 char session_id to concatenate with the json_hash | |
session_id = session_id_generator() | |
return self.build_key(session_id, data_graph=data_graph) | |
async def update_session(self, session_id, value) -> None: | |
if isinstance(self.cache_service, AsyncBaseCacheService): | |
await self.cache_service.set(session_id, value) | |
else: | |
await asyncio.to_thread(self.cache_service.set, session_id, value) | |
async def clear_session(self, session_id) -> None: | |
if isinstance(self.cache_service, AsyncBaseCacheService): | |
await self.cache_service.delete(session_id) | |
else: | |
await asyncio.to_thread(self.cache_service.delete, session_id) | |