|
""" |
|
S3 Cache implementation |
|
WARNING: DO NOT USE THIS IN PRODUCTION - This is not ASYNC |
|
|
|
Has 4 methods: |
|
- set_cache |
|
- get_cache |
|
- async_set_cache |
|
- async_get_cache |
|
""" |
|
|
|
import ast |
|
import asyncio |
|
import json |
|
from typing import Optional |
|
|
|
from litellm._logging import print_verbose, verbose_logger |
|
|
|
from .base_cache import BaseCache |
|
|
|
|
|
class S3Cache(BaseCache): |
|
def __init__( |
|
self, |
|
s3_bucket_name, |
|
s3_region_name=None, |
|
s3_api_version=None, |
|
s3_use_ssl: Optional[bool] = True, |
|
s3_verify=None, |
|
s3_endpoint_url=None, |
|
s3_aws_access_key_id=None, |
|
s3_aws_secret_access_key=None, |
|
s3_aws_session_token=None, |
|
s3_config=None, |
|
s3_path=None, |
|
**kwargs, |
|
): |
|
import boto3 |
|
|
|
self.bucket_name = s3_bucket_name |
|
self.key_prefix = s3_path.rstrip("/") + "/" if s3_path else "" |
|
|
|
|
|
self.s3_client = boto3.client( |
|
"s3", |
|
region_name=s3_region_name, |
|
endpoint_url=s3_endpoint_url, |
|
api_version=s3_api_version, |
|
use_ssl=s3_use_ssl, |
|
verify=s3_verify, |
|
aws_access_key_id=s3_aws_access_key_id, |
|
aws_secret_access_key=s3_aws_secret_access_key, |
|
aws_session_token=s3_aws_session_token, |
|
config=s3_config, |
|
**kwargs, |
|
) |
|
|
|
def set_cache(self, key, value, **kwargs): |
|
try: |
|
print_verbose(f"LiteLLM SET Cache - S3. Key={key}. Value={value}") |
|
ttl = kwargs.get("ttl", None) |
|
|
|
serialized_value = json.dumps(value) |
|
key = self.key_prefix + key |
|
|
|
if ttl is not None: |
|
cache_control = f"immutable, max-age={ttl}, s-maxage={ttl}" |
|
import datetime |
|
|
|
|
|
expiration_time = datetime.datetime.now() + ttl |
|
|
|
|
|
self.s3_client.put_object( |
|
Bucket=self.bucket_name, |
|
Key=key, |
|
Body=serialized_value, |
|
Expires=expiration_time, |
|
CacheControl=cache_control, |
|
ContentType="application/json", |
|
ContentLanguage="en", |
|
ContentDisposition=f'inline; filename="{key}.json"', |
|
) |
|
else: |
|
cache_control = "immutable, max-age=31536000, s-maxage=31536000" |
|
|
|
self.s3_client.put_object( |
|
Bucket=self.bucket_name, |
|
Key=key, |
|
Body=serialized_value, |
|
CacheControl=cache_control, |
|
ContentType="application/json", |
|
ContentLanguage="en", |
|
ContentDisposition=f'inline; filename="{key}.json"', |
|
) |
|
except Exception as e: |
|
|
|
print_verbose(f"S3 Caching: set_cache() - Got exception from S3: {e}") |
|
|
|
async def async_set_cache(self, key, value, **kwargs): |
|
self.set_cache(key=key, value=value, **kwargs) |
|
|
|
def get_cache(self, key, **kwargs): |
|
import botocore |
|
|
|
try: |
|
key = self.key_prefix + key |
|
|
|
print_verbose(f"Get S3 Cache: key: {key}") |
|
|
|
cached_response = self.s3_client.get_object( |
|
Bucket=self.bucket_name, Key=key |
|
) |
|
|
|
if cached_response is not None: |
|
|
|
cached_response = ( |
|
cached_response["Body"].read().decode("utf-8") |
|
) |
|
try: |
|
cached_response = json.loads( |
|
cached_response |
|
) |
|
except Exception: |
|
cached_response = ast.literal_eval(cached_response) |
|
if type(cached_response) is not dict: |
|
cached_response = dict(cached_response) |
|
verbose_logger.debug( |
|
f"Got S3 Cache: key: {key}, cached_response {cached_response}. Type Response {type(cached_response)}" |
|
) |
|
|
|
return cached_response |
|
except botocore.exceptions.ClientError as e: |
|
if e.response["Error"]["Code"] == "NoSuchKey": |
|
verbose_logger.debug( |
|
f"S3 Cache: The specified key '{key}' does not exist in the S3 bucket." |
|
) |
|
return None |
|
|
|
except Exception as e: |
|
|
|
verbose_logger.error( |
|
f"S3 Caching: get_cache() - Got exception from S3: {e}" |
|
) |
|
|
|
async def async_get_cache(self, key, **kwargs): |
|
return self.get_cache(key=key, **kwargs) |
|
|
|
def flush_cache(self): |
|
pass |
|
|
|
async def disconnect(self): |
|
pass |
|
|
|
async def async_set_cache_pipeline(self, cache_list, **kwargs): |
|
tasks = [] |
|
for val in cache_list: |
|
tasks.append(self.async_set_cache(val[0], val[1], **kwargs)) |
|
await asyncio.gather(*tasks) |
|
|