File size: 7,438 Bytes
e3278e4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
#### What this does ####
# On success + failure, log events to Supabase
from datetime import datetime
from typing import Optional, cast
import litellm
from litellm._logging import print_verbose, verbose_logger
from litellm.types.utils import StandardLoggingPayload
class S3Logger:
# Class variables or attributes
def __init__(
self,
s3_bucket_name=None,
s3_path=None,
s3_region_name=None,
s3_api_version=None,
s3_use_ssl=True,
s3_verify=None,
s3_endpoint_url=None,
s3_aws_access_key_id=None,
s3_aws_secret_access_key=None,
s3_aws_session_token=None,
s3_config=None,
**kwargs,
):
import boto3
try:
verbose_logger.debug(
f"in init s3 logger - s3_callback_params {litellm.s3_callback_params}"
)
s3_use_team_prefix = False
if litellm.s3_callback_params is not None:
# read in .env variables - example os.environ/AWS_BUCKET_NAME
for key, value in litellm.s3_callback_params.items():
if type(value) is str and value.startswith("os.environ/"):
litellm.s3_callback_params[key] = litellm.get_secret(value)
# now set s3 params from litellm.s3_logger_params
s3_bucket_name = litellm.s3_callback_params.get("s3_bucket_name")
s3_region_name = litellm.s3_callback_params.get("s3_region_name")
s3_api_version = litellm.s3_callback_params.get("s3_api_version")
s3_use_ssl = litellm.s3_callback_params.get("s3_use_ssl", True)
s3_verify = litellm.s3_callback_params.get("s3_verify")
s3_endpoint_url = litellm.s3_callback_params.get("s3_endpoint_url")
s3_aws_access_key_id = litellm.s3_callback_params.get(
"s3_aws_access_key_id"
)
s3_aws_secret_access_key = litellm.s3_callback_params.get(
"s3_aws_secret_access_key"
)
s3_aws_session_token = litellm.s3_callback_params.get(
"s3_aws_session_token"
)
s3_config = litellm.s3_callback_params.get("s3_config")
s3_path = litellm.s3_callback_params.get("s3_path")
# done reading litellm.s3_callback_params
s3_use_team_prefix = bool(
litellm.s3_callback_params.get("s3_use_team_prefix", False)
)
self.s3_use_team_prefix = s3_use_team_prefix
self.bucket_name = s3_bucket_name
self.s3_path = s3_path
verbose_logger.debug(f"s3 logger using endpoint url {s3_endpoint_url}")
# Create an S3 client with custom endpoint URL
self.s3_client = boto3.client(
"s3",
region_name=s3_region_name,
endpoint_url=s3_endpoint_url,
api_version=s3_api_version,
use_ssl=s3_use_ssl,
verify=s3_verify,
aws_access_key_id=s3_aws_access_key_id,
aws_secret_access_key=s3_aws_secret_access_key,
aws_session_token=s3_aws_session_token,
config=s3_config,
**kwargs,
)
except Exception as e:
print_verbose(f"Got exception on init s3 client {str(e)}")
raise e
async def _async_log_event(
self, kwargs, response_obj, start_time, end_time, print_verbose
):
self.log_event(kwargs, response_obj, start_time, end_time, print_verbose)
def log_event(self, kwargs, response_obj, start_time, end_time, print_verbose):
try:
verbose_logger.debug(
f"s3 Logging - Enters logging function for model {kwargs}"
)
# construct payload to send to s3
# follows the same params as langfuse.py
litellm_params = kwargs.get("litellm_params", {})
metadata = (
litellm_params.get("metadata", {}) or {}
) # if litellm_params['metadata'] == None
# Clean Metadata before logging - never log raw metadata
# the raw metadata can contain circular references which leads to infinite recursion
# we clean out all extra litellm metadata params before logging
clean_metadata = {}
if isinstance(metadata, dict):
for key, value in metadata.items():
# clean litellm metadata before logging
if key in [
"headers",
"endpoint",
"caching_groups",
"previous_models",
]:
continue
else:
clean_metadata[key] = value
# Ensure everything in the payload is converted to str
payload: Optional[StandardLoggingPayload] = cast(
Optional[StandardLoggingPayload],
kwargs.get("standard_logging_object", None),
)
if payload is None:
return
team_alias = payload["metadata"].get("user_api_key_team_alias")
team_alias_prefix = ""
if (
litellm.enable_preview_features
and self.s3_use_team_prefix
and team_alias is not None
):
team_alias_prefix = f"{team_alias}/"
s3_file_name = litellm.utils.get_logging_id(start_time, payload) or ""
s3_object_key = get_s3_object_key(
cast(Optional[str], self.s3_path) or "",
team_alias_prefix,
start_time,
s3_file_name,
)
s3_object_download_filename = (
"time-"
+ start_time.strftime("%Y-%m-%dT%H-%M-%S-%f")
+ "_"
+ payload["id"]
+ ".json"
)
import json
payload_str = json.dumps(payload)
print_verbose(f"\ns3 Logger - Logging payload = {payload_str}")
response = self.s3_client.put_object(
Bucket=self.bucket_name,
Key=s3_object_key,
Body=payload_str,
ContentType="application/json",
ContentLanguage="en",
ContentDisposition=f'inline; filename="{s3_object_download_filename}"',
CacheControl="private, immutable, max-age=31536000, s-maxage=0",
)
print_verbose(f"Response from s3:{str(response)}")
print_verbose(f"s3 Layer Logging - final response object: {response_obj}")
return response
except Exception as e:
verbose_logger.exception(f"s3 Layer Error - {str(e)}")
pass
def get_s3_object_key(
s3_path: str,
team_alias_prefix: str,
start_time: datetime,
s3_file_name: str,
) -> str:
s3_object_key = (
(s3_path.rstrip("/") + "/" if s3_path else "")
+ team_alias_prefix
+ start_time.strftime("%Y-%m-%d")
+ "/"
+ s3_file_name
) # we need the s3 key to include the time, so we log cache hits too
s3_object_key += ".json"
return s3_object_key
|