Tai Truong
fix readme
d202ada
from __future__ import annotations
from abc import abstractmethod
from typing import TYPE_CHECKING
from langflow.services.base import Service
if TYPE_CHECKING:
from langflow.services.session.service import SessionService
from langflow.services.settings.service import SettingsService
class StorageService(Service):
name = "storage_service"
def __init__(self, session_service: SessionService, settings_service: SettingsService):
self.settings_service = settings_service
self.session_service = session_service
self.set_ready()
def build_full_path(self, flow_id: str, file_name: str) -> str:
raise NotImplementedError
def set_ready(self) -> None:
self.ready = True
@abstractmethod
async def save_file(self, flow_id: str, file_name: str, data) -> None:
raise NotImplementedError
@abstractmethod
async def get_file(self, flow_id: str, file_name: str) -> bytes:
raise NotImplementedError
@abstractmethod
async def list_files(self, flow_id: str) -> list[str]:
raise NotImplementedError
@abstractmethod
async def delete_file(self, flow_id: str, file_name: str) -> None:
raise NotImplementedError
async def teardown(self) -> None:
raise NotImplementedError