from collections import defaultdict from collections.abc import Callable from threading import Lock from loguru import logger from langflow.services.base import Service from langflow.services.settings.service import SettingsService class StateService(Service): name = "state_service" def append_state(self, key, new_state, run_id: str) -> None: raise NotImplementedError def update_state(self, key, new_state, run_id: str) -> None: raise NotImplementedError def get_state(self, key, run_id: str): raise NotImplementedError def subscribe(self, key, observer: Callable) -> None: raise NotImplementedError def notify_observers(self, key, new_state) -> None: raise NotImplementedError class InMemoryStateService(StateService): def __init__(self, settings_service: SettingsService): self.settings_service = settings_service self.states: dict = {} self.observers: dict = defaultdict(list) self.lock = Lock() def append_state(self, key, new_state, run_id: str) -> None: with self.lock: if run_id not in self.states: self.states[run_id] = {} if key not in self.states[run_id]: self.states[run_id][key] = [] elif not isinstance(self.states[run_id][key], list): self.states[run_id][key] = [self.states[run_id][key]] self.states[run_id][key].append(new_state) self.notify_append_observers(key, new_state) def update_state(self, key, new_state, run_id: str) -> None: with self.lock: if run_id not in self.states: self.states[run_id] = {} self.states[run_id][key] = new_state self.notify_observers(key, new_state) def get_state(self, key, run_id: str): with self.lock: return self.states.get(run_id, {}).get(key, "") def subscribe(self, key, observer: Callable) -> None: with self.lock: if observer not in self.observers[key]: self.observers[key].append(observer) def notify_observers(self, key, new_state) -> None: for callback in self.observers[key]: callback(key, new_state, append=False) def notify_append_observers(self, key, new_state) -> None: for callback in self.observers[key]: try: callback(key, new_state, append=True) except Exception: # noqa: BLE001 logger.exception(f"Error in observer {callback} for key {key}") logger.warning("Callbacks not implemented yet")