from __future__ import annotations from abc import abstractmethod from typing import TYPE_CHECKING from langflow.services.base import Service if TYPE_CHECKING: from langflow.services.session.service import SessionService from langflow.services.settings.service import SettingsService class StorageService(Service): name = "storage_service" def __init__(self, session_service: SessionService, settings_service: SettingsService): self.settings_service = settings_service self.session_service = session_service self.set_ready() def build_full_path(self, flow_id: str, file_name: str) -> str: raise NotImplementedError def set_ready(self) -> None: self.ready = True @abstractmethod async def save_file(self, flow_id: str, file_name: str, data) -> None: raise NotImplementedError @abstractmethod async def get_file(self, flow_id: str, file_name: str) -> bytes: raise NotImplementedError @abstractmethod async def list_files(self, flow_id: str) -> list[str]: raise NotImplementedError @abstractmethod async def delete_file(self, flow_id: str, file_name: str) -> None: raise NotImplementedError async def teardown(self) -> None: raise NotImplementedError