Spaces:
Running
Running
import boto3 | |
from botocore.exceptions import ClientError, NoCredentialsError | |
from loguru import logger | |
from .service import StorageService | |
class S3StorageService(StorageService): | |
"""A service class for handling operations with AWS S3 storage.""" | |
def __init__(self, session_service, settings_service) -> None: | |
"""Initialize the S3 storage service with session and settings services.""" | |
super().__init__(session_service, settings_service) | |
self.bucket = "langflow" | |
self.s3_client = boto3.client("s3") | |
self.set_ready() | |
async def save_file(self, folder: str, file_name: str, data) -> None: | |
"""Save a file to the S3 bucket. | |
:param folder: The folder in the bucket to save the file. | |
:param file_name: The name of the file to be saved. | |
:param data: The byte content of the file. | |
:raises Exception: If an error occurs during file saving. | |
""" | |
try: | |
self.s3_client.put_object(Bucket=self.bucket, Key=f"{folder}/{file_name}", Body=data) | |
logger.info(f"File {file_name} saved successfully in folder {folder}.") | |
except NoCredentialsError: | |
logger.exception("Credentials not available for AWS S3.") | |
raise | |
except ClientError: | |
logger.exception(f"Error saving file {file_name} in folder {folder}") | |
raise | |
async def get_file(self, folder: str, file_name: str): | |
"""Retrieve a file from the S3 bucket. | |
:param folder: The folder in the bucket where the file is stored. | |
:param file_name: The name of the file to be retrieved. | |
:return: The byte content of the file. | |
:raises Exception: If an error occurs during file retrieval. | |
""" | |
try: | |
response = self.s3_client.get_object(Bucket=self.bucket, Key=f"{folder}/{file_name}") | |
logger.info(f"File {file_name} retrieved successfully from folder {folder}.") | |
return response["Body"].read() | |
except ClientError: | |
logger.exception(f"Error retrieving file {file_name} from folder {folder}") | |
raise | |
async def list_files(self, folder: str): | |
"""List all files in a specified folder of the S3 bucket. | |
:param folder: The folder in the bucket to list files from. | |
:return: A list of file names. | |
:raises Exception: If an error occurs during file listing. | |
""" | |
try: | |
response = self.s3_client.list_objects_v2(Bucket=self.bucket, Prefix=folder) | |
except ClientError: | |
logger.exception(f"Error listing files in folder {folder}") | |
raise | |
files = [item["Key"] for item in response.get("Contents", []) if "/" not in item["Key"][len(folder) :]] | |
logger.info(f"{len(files)} files listed in folder {folder}.") | |
return files | |
async def delete_file(self, folder: str, file_name: str) -> None: | |
"""Delete a file from the S3 bucket. | |
:param folder: The folder in the bucket where the file is stored. | |
:param file_name: The name of the file to be deleted. | |
:raises Exception: If an error occurs during file deletion. | |
""" | |
try: | |
self.s3_client.delete_object(Bucket=self.bucket, Key=f"{folder}/{file_name}") | |
logger.info(f"File {file_name} deleted successfully from folder {folder}.") | |
except ClientError: | |
logger.exception(f"Error deleting file {file_name} from folder {folder}") | |
raise | |
async def teardown(self) -> None: | |
"""Perform any cleanup operations when the service is being torn down.""" | |
# No specific teardown actions required for S3 storage at the moment. | |