|
|
|
|
|
|
|
from datetime import datetime |
|
from typing import Optional, cast |
|
|
|
import litellm |
|
from litellm._logging import print_verbose, verbose_logger |
|
from litellm.types.utils import StandardLoggingPayload |
|
|
|
|
|
class S3Logger: |
|
|
|
def __init__( |
|
self, |
|
s3_bucket_name=None, |
|
s3_path=None, |
|
s3_region_name=None, |
|
s3_api_version=None, |
|
s3_use_ssl=True, |
|
s3_verify=None, |
|
s3_endpoint_url=None, |
|
s3_aws_access_key_id=None, |
|
s3_aws_secret_access_key=None, |
|
s3_aws_session_token=None, |
|
s3_config=None, |
|
**kwargs, |
|
): |
|
import boto3 |
|
|
|
try: |
|
verbose_logger.debug( |
|
f"in init s3 logger - s3_callback_params {litellm.s3_callback_params}" |
|
) |
|
|
|
s3_use_team_prefix = False |
|
|
|
if litellm.s3_callback_params is not None: |
|
|
|
for key, value in litellm.s3_callback_params.items(): |
|
if type(value) is str and value.startswith("os.environ/"): |
|
litellm.s3_callback_params[key] = litellm.get_secret(value) |
|
|
|
s3_bucket_name = litellm.s3_callback_params.get("s3_bucket_name") |
|
s3_region_name = litellm.s3_callback_params.get("s3_region_name") |
|
s3_api_version = litellm.s3_callback_params.get("s3_api_version") |
|
s3_use_ssl = litellm.s3_callback_params.get("s3_use_ssl", True) |
|
s3_verify = litellm.s3_callback_params.get("s3_verify") |
|
s3_endpoint_url = litellm.s3_callback_params.get("s3_endpoint_url") |
|
s3_aws_access_key_id = litellm.s3_callback_params.get( |
|
"s3_aws_access_key_id" |
|
) |
|
s3_aws_secret_access_key = litellm.s3_callback_params.get( |
|
"s3_aws_secret_access_key" |
|
) |
|
s3_aws_session_token = litellm.s3_callback_params.get( |
|
"s3_aws_session_token" |
|
) |
|
s3_config = litellm.s3_callback_params.get("s3_config") |
|
s3_path = litellm.s3_callback_params.get("s3_path") |
|
|
|
s3_use_team_prefix = bool( |
|
litellm.s3_callback_params.get("s3_use_team_prefix", False) |
|
) |
|
self.s3_use_team_prefix = s3_use_team_prefix |
|
self.bucket_name = s3_bucket_name |
|
self.s3_path = s3_path |
|
verbose_logger.debug(f"s3 logger using endpoint url {s3_endpoint_url}") |
|
|
|
self.s3_client = boto3.client( |
|
"s3", |
|
region_name=s3_region_name, |
|
endpoint_url=s3_endpoint_url, |
|
api_version=s3_api_version, |
|
use_ssl=s3_use_ssl, |
|
verify=s3_verify, |
|
aws_access_key_id=s3_aws_access_key_id, |
|
aws_secret_access_key=s3_aws_secret_access_key, |
|
aws_session_token=s3_aws_session_token, |
|
config=s3_config, |
|
**kwargs, |
|
) |
|
except Exception as e: |
|
print_verbose(f"Got exception on init s3 client {str(e)}") |
|
raise e |
|
|
|
async def _async_log_event( |
|
self, kwargs, response_obj, start_time, end_time, print_verbose |
|
): |
|
self.log_event(kwargs, response_obj, start_time, end_time, print_verbose) |
|
|
|
def log_event(self, kwargs, response_obj, start_time, end_time, print_verbose): |
|
try: |
|
verbose_logger.debug( |
|
f"s3 Logging - Enters logging function for model {kwargs}" |
|
) |
|
|
|
|
|
|
|
litellm_params = kwargs.get("litellm_params", {}) |
|
metadata = ( |
|
litellm_params.get("metadata", {}) or {} |
|
) |
|
|
|
|
|
|
|
|
|
clean_metadata = {} |
|
if isinstance(metadata, dict): |
|
for key, value in metadata.items(): |
|
|
|
if key in [ |
|
"headers", |
|
"endpoint", |
|
"caching_groups", |
|
"previous_models", |
|
]: |
|
continue |
|
else: |
|
clean_metadata[key] = value |
|
|
|
|
|
payload: Optional[StandardLoggingPayload] = cast( |
|
Optional[StandardLoggingPayload], |
|
kwargs.get("standard_logging_object", None), |
|
) |
|
|
|
if payload is None: |
|
return |
|
|
|
team_alias = payload["metadata"].get("user_api_key_team_alias") |
|
|
|
team_alias_prefix = "" |
|
if ( |
|
litellm.enable_preview_features |
|
and self.s3_use_team_prefix |
|
and team_alias is not None |
|
): |
|
team_alias_prefix = f"{team_alias}/" |
|
|
|
s3_file_name = litellm.utils.get_logging_id(start_time, payload) or "" |
|
s3_object_key = get_s3_object_key( |
|
cast(Optional[str], self.s3_path) or "", |
|
team_alias_prefix, |
|
start_time, |
|
s3_file_name, |
|
) |
|
|
|
s3_object_download_filename = ( |
|
"time-" |
|
+ start_time.strftime("%Y-%m-%dT%H-%M-%S-%f") |
|
+ "_" |
|
+ payload["id"] |
|
+ ".json" |
|
) |
|
|
|
import json |
|
|
|
payload_str = json.dumps(payload) |
|
|
|
print_verbose(f"\ns3 Logger - Logging payload = {payload_str}") |
|
|
|
response = self.s3_client.put_object( |
|
Bucket=self.bucket_name, |
|
Key=s3_object_key, |
|
Body=payload_str, |
|
ContentType="application/json", |
|
ContentLanguage="en", |
|
ContentDisposition=f'inline; filename="{s3_object_download_filename}"', |
|
CacheControl="private, immutable, max-age=31536000, s-maxage=0", |
|
) |
|
|
|
print_verbose(f"Response from s3:{str(response)}") |
|
|
|
print_verbose(f"s3 Layer Logging - final response object: {response_obj}") |
|
return response |
|
except Exception as e: |
|
verbose_logger.exception(f"s3 Layer Error - {str(e)}") |
|
pass |
|
|
|
|
|
def get_s3_object_key( |
|
s3_path: str, |
|
team_alias_prefix: str, |
|
start_time: datetime, |
|
s3_file_name: str, |
|
) -> str: |
|
s3_object_key = ( |
|
(s3_path.rstrip("/") + "/" if s3_path else "") |
|
+ team_alias_prefix |
|
+ start_time.strftime("%Y-%m-%d") |
|
+ "/" |
|
+ s3_file_name |
|
) |
|
s3_object_key += ".json" |
|
return s3_object_key |
|
|