Tai Truong
fix readme
d202ada
import boto3
from botocore.exceptions import ClientError, NoCredentialsError
from loguru import logger
from .service import StorageService
class S3StorageService(StorageService):
"""A service class for handling operations with AWS S3 storage."""
def __init__(self, session_service, settings_service) -> None:
"""Initialize the S3 storage service with session and settings services."""
super().__init__(session_service, settings_service)
self.bucket = "langflow"
self.s3_client = boto3.client("s3")
self.set_ready()
async def save_file(self, folder: str, file_name: str, data) -> None:
"""Save a file to the S3 bucket.
:param folder: The folder in the bucket to save the file.
:param file_name: The name of the file to be saved.
:param data: The byte content of the file.
:raises Exception: If an error occurs during file saving.
"""
try:
self.s3_client.put_object(Bucket=self.bucket, Key=f"{folder}/{file_name}", Body=data)
logger.info(f"File {file_name} saved successfully in folder {folder}.")
except NoCredentialsError:
logger.exception("Credentials not available for AWS S3.")
raise
except ClientError:
logger.exception(f"Error saving file {file_name} in folder {folder}")
raise
async def get_file(self, folder: str, file_name: str):
"""Retrieve a file from the S3 bucket.
:param folder: The folder in the bucket where the file is stored.
:param file_name: The name of the file to be retrieved.
:return: The byte content of the file.
:raises Exception: If an error occurs during file retrieval.
"""
try:
response = self.s3_client.get_object(Bucket=self.bucket, Key=f"{folder}/{file_name}")
logger.info(f"File {file_name} retrieved successfully from folder {folder}.")
return response["Body"].read()
except ClientError:
logger.exception(f"Error retrieving file {file_name} from folder {folder}")
raise
async def list_files(self, folder: str):
"""List all files in a specified folder of the S3 bucket.
:param folder: The folder in the bucket to list files from.
:return: A list of file names.
:raises Exception: If an error occurs during file listing.
"""
try:
response = self.s3_client.list_objects_v2(Bucket=self.bucket, Prefix=folder)
except ClientError:
logger.exception(f"Error listing files in folder {folder}")
raise
files = [item["Key"] for item in response.get("Contents", []) if "/" not in item["Key"][len(folder) :]]
logger.info(f"{len(files)} files listed in folder {folder}.")
return files
async def delete_file(self, folder: str, file_name: str) -> None:
"""Delete a file from the S3 bucket.
:param folder: The folder in the bucket where the file is stored.
:param file_name: The name of the file to be deleted.
:raises Exception: If an error occurs during file deletion.
"""
try:
self.s3_client.delete_object(Bucket=self.bucket, Key=f"{folder}/{file_name}")
logger.info(f"File {file_name} deleted successfully from folder {folder}.")
except ClientError:
logger.exception(f"Error deleting file {file_name} from folder {folder}")
raise
async def teardown(self) -> None:
"""Perform any cleanup operations when the service is being torn down."""
# No specific teardown actions required for S3 storage at the moment.