Tai Truong
fix readme
d202ada
import asyncio
from typing import TYPE_CHECKING
from langflow.services.base import Service
from langflow.services.cache.base import AsyncBaseCacheService
from langflow.services.cache.utils import CacheMiss
from langflow.services.session.utils import compute_dict_hash, session_id_generator
if TYPE_CHECKING:
from langflow.services.cache.base import CacheService
class SessionService(Service):
name = "session_service"
def __init__(self, cache_service) -> None:
self.cache_service: CacheService | AsyncBaseCacheService = cache_service
async def load_session(self, key, flow_id: str, data_graph: dict | None = None):
# Check if the data is cached
if isinstance(self.cache_service, AsyncBaseCacheService):
value = await self.cache_service.get(key)
else:
value = await asyncio.to_thread(self.cache_service.get, key)
if not isinstance(value, CacheMiss):
return value
if key is None:
key = self.generate_key(session_id=None, data_graph=data_graph)
if data_graph is None:
return None, None
# If not cached, build the graph and cache it
from langflow.graph.graph.base import Graph
graph = Graph.from_payload(data_graph, flow_id=flow_id)
artifacts: dict = {}
await self.cache_service.set(key, (graph, artifacts))
return graph, artifacts
def build_key(self, session_id, data_graph) -> str:
json_hash = compute_dict_hash(data_graph)
return f"{session_id}{':' if session_id else ''}{json_hash}"
def generate_key(self, session_id, data_graph):
# Hash the JSON and combine it with the session_id to create a unique key
if session_id is None:
# generate a 5 char session_id to concatenate with the json_hash
session_id = session_id_generator()
return self.build_key(session_id, data_graph=data_graph)
async def update_session(self, session_id, value) -> None:
if isinstance(self.cache_service, AsyncBaseCacheService):
await self.cache_service.set(session_id, value)
else:
await asyncio.to_thread(self.cache_service.set, session_id, value)
async def clear_session(self, session_id) -> None:
if isinstance(self.cache_service, AsyncBaseCacheService):
await self.cache_service.delete(session_id)
else:
await asyncio.to_thread(self.cache_service.delete, session_id)