from typing import Any import socketio from loguru import logger from langflow.services.base import Service from langflow.services.cache.base import AsyncBaseCacheService, CacheService from langflow.services.deps import get_chat_service from langflow.services.socket.utils import build_vertex, get_vertices class SocketIOService(Service): name = "socket_service" def __init__(self, cache_service: CacheService | AsyncBaseCacheService): self.cache_service = cache_service def init(self, sio: socketio.AsyncServer) -> None: # Registering event handlers self.sio = sio if self.sio: self.sio.event(self.connect) self.sio.event(self.disconnect) self.sio.on("message")(self.message) self.sio.on("get_vertices")(self.on_get_vertices) self.sio.on("build_vertex")(self.on_build_vertex) self.sessions = {} # type: dict[str, dict] async def emit_error(self, sid, error) -> None: await self.sio.emit("error", to=sid, data=error) async def connect(self, sid, environ) -> None: logger.info(f"Socket connected: {sid}") self.sessions[sid] = environ async def disconnect(self, sid) -> None: logger.info(f"Socket disconnected: {sid}") self.sessions.pop(sid, None) async def message(self, sid, data=None) -> None: # Logic for handling messages await self.emit_message(to=sid, data=data or {"foo": "bar", "baz": [1, 2, 3]}) async def emit_message(self, to, data) -> None: # Abstracting sio.emit await self.sio.emit("message", to=to, data=data) async def emit_token(self, to, data) -> None: await self.sio.emit("token", to=to, data=data) async def on_get_vertices(self, sid, flow_id) -> None: await get_vertices(self.sio, sid, flow_id, get_chat_service()) async def on_build_vertex(self, sid, flow_id, vertex_id) -> None: await build_vertex( sio=self.sio, sid=sid, flow_id=flow_id, vertex_id=vertex_id, get_cache=self.get_cache, set_cache=self.set_cache, ) async def get_cache(self, sid: str) -> Any: """Get the cache for a client.""" value = self.cache_service.get(sid) if isinstance(self.cache_service, AsyncBaseCacheService): return await value return value async def set_cache(self, sid: str, build_result: Any) -> bool: """Set the cache for a client.""" # client_id is the flow id but that already exists in the cache # so we need to change it to something else result_dict = { "result": build_result, "type": type(build_result), } result = self.cache_service.upsert(sid, result_dict) if isinstance(self.cache_service, AsyncBaseCacheService): await result return await self.cache_service.contains(sid) return sid in self.cache_service