File size: 3,890 Bytes
8bc2fc9 4d0b8a7 bc59ad5 4d0b8a7 bc59ad5 4d0b8a7 22fe41e 4d0b8a7 f74a214 4d0b8a7 22fe41e f74a214 4d0b8a7 8bc2fc9 4d0b8a7 22fe41e 8bc2fc9 4d0b8a7 8bc2fc9 4d0b8a7 8bc2fc9 4d0b8a7 8bc2fc9 4d0b8a7 22fe41e 8bc2fc9 4d0b8a7 22fe41e 8bc2fc9 4d0b8a7 22fe41e 8bc2fc9 4d0b8a7 22fe41e 8bc2fc9 4d0b8a7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
import logging
import boto3
from botocore.exceptions import ClientError
import time
from io import BytesIO
from rag.utils import singleton
from rag import settings
@singleton
class RAGFlowS3(object):
def __init__(self):
self.conn = None
self.s3_config = settings.S3
self.access_key = self.s3_config.get('access_key', None)
self.secret_key = self.s3_config.get('secret_key', None)
self.region = self.s3_config.get('region', None)
self.__open__()
def __open__(self):
try:
if self.conn:
self.__close__()
except Exception:
pass
try:
self.conn = boto3.client(
's3',
region_name=self.region,
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key
)
except Exception:
logging.exception(f"Fail to connect at region {self.region}")
def __close__(self):
del self.conn
self.conn = None
def bucket_exists(self, bucket):
try:
logging.debug(f"head_bucket bucketname {bucket}")
self.conn.head_bucket(Bucket=bucket)
exists = True
except ClientError:
logging.exception(f"head_bucket error {bucket}")
exists = False
return exists
def health(self):
bucket, fnm, binary = "txtxtxtxt1", "txtxtxtxt1", b"_t@@@1"
if not self.bucket_exists(bucket):
self.conn.create_bucket(Bucket=bucket)
logging.debug(f"create bucket {bucket} ********")
r = self.conn.upload_fileobj(BytesIO(binary), bucket, fnm)
return r
def get_properties(self, bucket, key):
return {}
def list(self, bucket, dir, recursive=True):
return []
def put(self, bucket, fnm, binary):
logging.debug(f"bucket name {bucket}; filename :{fnm}:")
for _ in range(1):
try:
if not self.bucket_exists(bucket):
self.conn.create_bucket(Bucket=bucket)
logging.info(f"create bucket {bucket} ********")
r = self.conn.upload_fileobj(BytesIO(binary), bucket, fnm)
return r
except Exception:
logging.exception(f"Fail put {bucket}/{fnm}")
self.__open__()
time.sleep(1)
def rm(self, bucket, fnm):
try:
self.conn.delete_object(Bucket=bucket, Key=fnm)
except Exception:
logging.exception(f"Fail rm {bucket}/{fnm}")
def get(self, bucket, fnm):
for _ in range(1):
try:
r = self.conn.get_object(Bucket=bucket, Key=fnm)
object_data = r['Body'].read()
return object_data
except Exception:
logging.exception(f"fail get {bucket}/{fnm}")
self.__open__()
time.sleep(1)
return
def obj_exist(self, bucket, fnm):
try:
if self.conn.head_object(Bucket=bucket, Key=fnm):
return True
except ClientError as e:
if e.response['Error']['Code'] == '404':
return False
else:
raise
def get_presigned_url(self, bucket, fnm, expires):
for _ in range(10):
try:
r = self.conn.generate_presigned_url('get_object',
Params={'Bucket': bucket,
'Key': fnm},
ExpiresIn=expires)
return r
except Exception:
logging.exception(f"fail get url {bucket}/{fnm}")
self.__open__()
time.sleep(1)
return
|