import base64 import os.path import traceback import uuid from pathlib import Path from typing import Optional import aioboto3 import aiofiles from metagpt.config import CONFIG from metagpt.const import BASE64_FORMAT from metagpt.logs import logger class S3: """A class for interacting with Amazon S3 storage.""" def __init__(self): self.session = aioboto3.Session() self.s3_config = CONFIG.S3 self.auth_config = { "service_name": "s3", "aws_access_key_id": self.s3_config["access_key"], "aws_secret_access_key": self.s3_config["secret_key"], "endpoint_url": self.s3_config["endpoint_url"], } async def upload_file( self, bucket: str, local_path: str, object_name: str, ) -> None: """Upload a file from the local path to the specified path of the storage bucket specified in s3. Args: bucket: The name of the S3 storage bucket. local_path: The local file path, including the file name. object_name: The complete path of the uploaded file to be stored in S3, including the file name. Raises: Exception: If an error occurs during the upload process, an exception is raised. """ try: async with self.session.client(**self.auth_config) as client: async with aiofiles.open(local_path, mode="rb") as reader: body = await reader.read() await client.put_object(Body=body, Bucket=bucket, Key=object_name) logger.info(f"Successfully uploaded the file to path {object_name} in bucket {bucket} of s3.") except Exception as e: logger.error(f"Failed to upload the file to path {object_name} in bucket {bucket} of s3: {e}") raise e async def get_object_url( self, bucket: str, object_name: str, ) -> str: """Get the URL for a downloadable or preview file stored in the specified S3 bucket. Args: bucket: The name of the S3 storage bucket. object_name: The complete path of the file stored in S3, including the file name. Returns: The URL for the downloadable or preview file. Raises: Exception: If an error occurs while retrieving the URL, an exception is raised. """ try: async with self.session.client(**self.auth_config) as client: file = await client.get_object(Bucket=bucket, Key=object_name) return str(file["Body"].url) except Exception as e: logger.error(f"Failed to get the url for a downloadable or preview file: {e}") raise e async def get_object( self, bucket: str, object_name: str, ) -> bytes: """Get the binary data of a file stored in the specified S3 bucket. Args: bucket: The name of the S3 storage bucket. object_name: The complete path of the file stored in S3, including the file name. Returns: The binary data of the requested file. Raises: Exception: If an error occurs while retrieving the file data, an exception is raised. """ try: async with self.session.client(**self.auth_config) as client: s3_object = await client.get_object(Bucket=bucket, Key=object_name) return await s3_object["Body"].read() except Exception as e: logger.error(f"Failed to get the binary data of the file: {e}") raise e async def download_file( self, bucket: str, object_name: str, local_path: str, chunk_size: Optional[int] = 128 * 1024 ) -> None: """Download an S3 object to a local file. Args: bucket: The name of the S3 storage bucket. object_name: The complete path of the file stored in S3, including the file name. local_path: The local file path where the S3 object will be downloaded. chunk_size: The size of data chunks to read and write at a time. Default is 128 KB. Raises: Exception: If an error occurs during the download process, an exception is raised. """ try: async with self.session.client(**self.auth_config) as client: s3_object = await client.get_object(Bucket=bucket, Key=object_name) stream = s3_object["Body"] async with aiofiles.open(local_path, mode="wb") as writer: while True: file_data = await stream.read(chunk_size) if not file_data: break await writer.write(file_data) except Exception as e: logger.error(f"Failed to download the file from S3: {e}") raise e async def cache(self, data: str, file_ext: str, format: str = "") -> str: """Save data to remote S3 and return url""" object_name = str(uuid.uuid4()).replace("-", "") + file_ext path = Path(__file__).parent pathname = path / object_name try: async with aiofiles.open(str(pathname), mode="wb") as file: if format == BASE64_FORMAT: data = base64.b64decode(data) await file.write(data) bucket = CONFIG.S3.get("bucket") object_pathname = CONFIG.S3.get("path") or "system" object_pathname += f"/{object_name}" object_pathname = os.path.normpath(object_pathname) await self.upload_file(bucket=bucket, local_path=str(pathname), object_name=object_pathname) pathname.unlink(missing_ok=True) return await self.get_object_url(bucket=bucket, object_name=object_pathname) except Exception as e: logger.exception(f"{e}, stack:{traceback.format_exc()}") pathname.unlink(missing_ok=True) return None