|
|
|
|
|
|
|
import copy |
|
import datetime |
|
import json |
|
import os |
|
import re |
|
import subprocess |
|
import sys |
|
import time |
|
import traceback |
|
import uuid |
|
from datetime import datetime as dt_object |
|
from functools import lru_cache |
|
from typing import Any, Callable, Dict, List, Literal, Optional, Tuple, Union, cast |
|
|
|
from pydantic import BaseModel |
|
|
|
import litellm |
|
from litellm import ( |
|
_custom_logger_compatible_callbacks_literal, |
|
json_logs, |
|
log_raw_request_response, |
|
turn_off_message_logging, |
|
) |
|
from litellm._logging import _is_debugging_on, verbose_logger |
|
from litellm.caching.caching import DualCache, InMemoryCache |
|
from litellm.caching.caching_handler import LLMCachingHandler |
|
from litellm.cost_calculator import _select_model_name_for_cost_calc |
|
from litellm.integrations.custom_guardrail import CustomGuardrail |
|
from litellm.integrations.custom_logger import CustomLogger |
|
from litellm.integrations.mlflow import MlflowLogger |
|
from litellm.integrations.pagerduty.pagerduty import PagerDutyAlerting |
|
from litellm.litellm_core_utils.get_litellm_params import get_litellm_params |
|
from litellm.litellm_core_utils.redact_messages import ( |
|
redact_message_input_output_from_custom_logger, |
|
redact_message_input_output_from_logging, |
|
) |
|
from litellm.types.llms.openai import ( |
|
AllMessageValues, |
|
Batch, |
|
FineTuningJob, |
|
HttpxBinaryResponseContent, |
|
) |
|
from litellm.types.rerank import RerankResponse |
|
from litellm.types.router import SPECIAL_MODEL_INFO_PARAMS |
|
from litellm.types.utils import ( |
|
CallTypes, |
|
EmbeddingResponse, |
|
ImageResponse, |
|
LiteLLMLoggingBaseClass, |
|
ModelResponse, |
|
ModelResponseStream, |
|
StandardCallbackDynamicParams, |
|
StandardLoggingAdditionalHeaders, |
|
StandardLoggingHiddenParams, |
|
StandardLoggingMetadata, |
|
StandardLoggingModelCostFailureDebugInformation, |
|
StandardLoggingModelInformation, |
|
StandardLoggingPayload, |
|
StandardLoggingPayloadErrorInformation, |
|
StandardLoggingPayloadStatus, |
|
StandardLoggingPromptManagementMetadata, |
|
TextCompletionResponse, |
|
TranscriptionResponse, |
|
Usage, |
|
) |
|
from litellm.utils import _get_base_model_from_metadata, executor, print_verbose |
|
|
|
from ..integrations.argilla import ArgillaLogger |
|
from ..integrations.arize_ai import ArizeLogger |
|
from ..integrations.athina import AthinaLogger |
|
from ..integrations.azure_storage.azure_storage import AzureBlobStorageLogger |
|
from ..integrations.braintrust_logging import BraintrustLogger |
|
from ..integrations.datadog.datadog import DataDogLogger |
|
from ..integrations.datadog.datadog_llm_obs import DataDogLLMObsLogger |
|
from ..integrations.dynamodb import DyanmoDBLogger |
|
from ..integrations.galileo import GalileoObserve |
|
from ..integrations.gcs_bucket.gcs_bucket import GCSBucketLogger |
|
from ..integrations.gcs_pubsub.pub_sub import GcsPubSubLogger |
|
from ..integrations.greenscale import GreenscaleLogger |
|
from ..integrations.helicone import HeliconeLogger |
|
from ..integrations.humanloop import HumanloopLogger |
|
from ..integrations.lago import LagoLogger |
|
from ..integrations.langfuse.langfuse import LangFuseLogger |
|
from ..integrations.langfuse.langfuse_handler import LangFuseHandler |
|
from ..integrations.langfuse.langfuse_prompt_management import LangfusePromptManagement |
|
from ..integrations.langsmith import LangsmithLogger |
|
from ..integrations.literal_ai import LiteralAILogger |
|
from ..integrations.logfire_logger import LogfireLevel, LogfireLogger |
|
from ..integrations.lunary import LunaryLogger |
|
from ..integrations.openmeter import OpenMeterLogger |
|
from ..integrations.opik.opik import OpikLogger |
|
from ..integrations.prometheus import PrometheusLogger |
|
from ..integrations.prompt_layer import PromptLayerLogger |
|
from ..integrations.s3 import S3Logger |
|
from ..integrations.supabase import Supabase |
|
from ..integrations.traceloop import TraceloopLogger |
|
from ..integrations.weights_biases import WeightsBiasesLogger |
|
from .exception_mapping_utils import _get_response_headers |
|
from .initialize_dynamic_callback_params import ( |
|
initialize_standard_callback_dynamic_params as _initialize_standard_callback_dynamic_params, |
|
) |
|
from .logging_utils import _assemble_complete_response_from_streaming_chunks |
|
from .specialty_caches.dynamic_logging_cache import DynamicLoggingCache |
|
|
|
try: |
|
from ..proxy.enterprise.enterprise_callbacks.generic_api_callback import ( |
|
GenericAPILogger, |
|
) |
|
except Exception as e: |
|
verbose_logger.debug( |
|
f"[Non-Blocking] Unable to import GenericAPILogger - LiteLLM Enterprise Feature - {str(e)}" |
|
) |
|
|
|
_in_memory_loggers: List[Any] = [] |
|
|
|
|
|
|
|
sentry_sdk_instance = None |
|
capture_exception = None |
|
add_breadcrumb = None |
|
posthog = None |
|
slack_app = None |
|
alerts_channel = None |
|
heliconeLogger = None |
|
athinaLogger = None |
|
promptLayerLogger = None |
|
logfireLogger = None |
|
weightsBiasesLogger = None |
|
customLogger = None |
|
langFuseLogger = None |
|
openMeterLogger = None |
|
lagoLogger = None |
|
dataDogLogger = None |
|
prometheusLogger = None |
|
dynamoLogger = None |
|
s3Logger = None |
|
genericAPILogger = None |
|
greenscaleLogger = None |
|
lunaryLogger = None |
|
supabaseClient = None |
|
callback_list: Optional[List[str]] = [] |
|
user_logger_fn = None |
|
additional_details: Optional[Dict[str, str]] = {} |
|
local_cache: Optional[Dict[str, str]] = {} |
|
last_fetched_at = None |
|
last_fetched_at_keys = None |
|
|
|
|
|
|
|
class ServiceTraceIDCache: |
|
def __init__(self) -> None: |
|
self.cache = InMemoryCache() |
|
|
|
def get_cache(self, litellm_call_id: str, service_name: str) -> Optional[str]: |
|
key_name = "{}:{}".format(service_name, litellm_call_id) |
|
response = self.cache.get_cache(key=key_name) |
|
return response |
|
|
|
def set_cache(self, litellm_call_id: str, service_name: str, trace_id: str) -> None: |
|
key_name = "{}:{}".format(service_name, litellm_call_id) |
|
self.cache.set_cache(key=key_name, value=trace_id) |
|
return None |
|
|
|
|
|
in_memory_trace_id_cache = ServiceTraceIDCache() |
|
in_memory_dynamic_logger_cache = DynamicLoggingCache() |
|
|
|
|
|
class Logging(LiteLLMLoggingBaseClass): |
|
global supabaseClient, promptLayerLogger, weightsBiasesLogger, logfireLogger, capture_exception, add_breadcrumb, lunaryLogger, logfireLogger, prometheusLogger, slack_app |
|
custom_pricing: bool = False |
|
stream_options = None |
|
|
|
def __init__( |
|
self, |
|
model: str, |
|
messages, |
|
stream, |
|
call_type, |
|
start_time, |
|
litellm_call_id: str, |
|
function_id: str, |
|
litellm_trace_id: Optional[str] = None, |
|
dynamic_input_callbacks: Optional[ |
|
List[Union[str, Callable, CustomLogger]] |
|
] = None, |
|
dynamic_success_callbacks: Optional[ |
|
List[Union[str, Callable, CustomLogger]] |
|
] = None, |
|
dynamic_async_success_callbacks: Optional[ |
|
List[Union[str, Callable, CustomLogger]] |
|
] = None, |
|
dynamic_failure_callbacks: Optional[ |
|
List[Union[str, Callable, CustomLogger]] |
|
] = None, |
|
dynamic_async_failure_callbacks: Optional[ |
|
List[Union[str, Callable, CustomLogger]] |
|
] = None, |
|
kwargs: Optional[Dict] = None, |
|
): |
|
_input: Optional[str] = messages |
|
if messages is not None: |
|
if isinstance(messages, str): |
|
messages = [ |
|
{"role": "user", "content": messages} |
|
] |
|
elif ( |
|
isinstance(messages, list) |
|
and len(messages) > 0 |
|
and isinstance(messages[0], str) |
|
): |
|
new_messages = [] |
|
for m in messages: |
|
new_messages.append({"role": "user", "content": m}) |
|
messages = new_messages |
|
self.model = model |
|
self.messages = copy.deepcopy(messages) |
|
self.stream = stream |
|
self.start_time = start_time |
|
self.call_type = call_type |
|
self.litellm_call_id = litellm_call_id |
|
self.litellm_trace_id = litellm_trace_id |
|
self.function_id = function_id |
|
self.streaming_chunks: List[Any] = [] |
|
self.sync_streaming_chunks: List[Any] = ( |
|
[] |
|
) |
|
|
|
|
|
self.dynamic_input_callbacks: Optional[ |
|
List[Union[str, Callable, CustomLogger]] |
|
] = dynamic_input_callbacks |
|
self.dynamic_success_callbacks: Optional[ |
|
List[Union[str, Callable, CustomLogger]] |
|
] = dynamic_success_callbacks |
|
self.dynamic_async_success_callbacks: Optional[ |
|
List[Union[str, Callable, CustomLogger]] |
|
] = dynamic_async_success_callbacks |
|
self.dynamic_failure_callbacks: Optional[ |
|
List[Union[str, Callable, CustomLogger]] |
|
] = dynamic_failure_callbacks |
|
self.dynamic_async_failure_callbacks: Optional[ |
|
List[Union[str, Callable, CustomLogger]] |
|
] = dynamic_async_failure_callbacks |
|
|
|
|
|
self.process_dynamic_callbacks() |
|
|
|
|
|
self.standard_callback_dynamic_params: StandardCallbackDynamicParams = ( |
|
self.initialize_standard_callback_dynamic_params(kwargs) |
|
) |
|
|
|
|
|
self.completion_start_time: Optional[datetime.datetime] = None |
|
self._llm_caching_handler: Optional[LLMCachingHandler] = None |
|
|
|
|
|
litellm_params = {} |
|
if kwargs is not None: |
|
litellm_params = get_litellm_params(**kwargs) |
|
litellm_params = scrub_sensitive_keys_in_metadata(litellm_params) |
|
|
|
self.litellm_params = litellm_params |
|
|
|
self.model_call_details: Dict[str, Any] = { |
|
"litellm_trace_id": litellm_trace_id, |
|
"litellm_call_id": litellm_call_id, |
|
"input": _input, |
|
"litellm_params": litellm_params, |
|
} |
|
|
|
def process_dynamic_callbacks(self): |
|
""" |
|
Initializes CustomLogger compatible callbacks in self.dynamic_* callbacks |
|
|
|
If a callback is in litellm._known_custom_logger_compatible_callbacks, it needs to be intialized and added to the respective dynamic_* callback list. |
|
""" |
|
|
|
self.dynamic_input_callbacks = self._process_dynamic_callback_list( |
|
self.dynamic_input_callbacks, dynamic_callbacks_type="input" |
|
) |
|
|
|
|
|
self.dynamic_failure_callbacks = self._process_dynamic_callback_list( |
|
self.dynamic_failure_callbacks, dynamic_callbacks_type="failure" |
|
) |
|
|
|
|
|
self.dynamic_async_failure_callbacks = self._process_dynamic_callback_list( |
|
self.dynamic_async_failure_callbacks, dynamic_callbacks_type="async_failure" |
|
) |
|
|
|
|
|
self.dynamic_success_callbacks = self._process_dynamic_callback_list( |
|
self.dynamic_success_callbacks, dynamic_callbacks_type="success" |
|
) |
|
|
|
|
|
self.dynamic_async_success_callbacks = self._process_dynamic_callback_list( |
|
self.dynamic_async_success_callbacks, dynamic_callbacks_type="async_success" |
|
) |
|
|
|
def _process_dynamic_callback_list( |
|
self, |
|
callback_list: Optional[List[Union[str, Callable, CustomLogger]]], |
|
dynamic_callbacks_type: Literal[ |
|
"input", "success", "failure", "async_success", "async_failure" |
|
], |
|
) -> Optional[List[Union[str, Callable, CustomLogger]]]: |
|
""" |
|
Helper function to initialize CustomLogger compatible callbacks in self.dynamic_* callbacks |
|
|
|
- If a callback is in litellm._known_custom_logger_compatible_callbacks, |
|
replace the string with the initialized callback class. |
|
- If dynamic callback is a "success" callback that is a known_custom_logger_compatible_callbacks then add it to dynamic_async_success_callbacks |
|
- If dynamic callback is a "failure" callback that is a known_custom_logger_compatible_callbacks then add it to dynamic_failure_callbacks |
|
""" |
|
if callback_list is None: |
|
return None |
|
|
|
processed_list: List[Union[str, Callable, CustomLogger]] = [] |
|
for callback in callback_list: |
|
if ( |
|
isinstance(callback, str) |
|
and callback in litellm._known_custom_logger_compatible_callbacks |
|
): |
|
callback_class = _init_custom_logger_compatible_class( |
|
callback, internal_usage_cache=None, llm_router=None |
|
) |
|
if callback_class is not None: |
|
processed_list.append(callback_class) |
|
|
|
|
|
if dynamic_callbacks_type == "success": |
|
if self.dynamic_async_success_callbacks is None: |
|
self.dynamic_async_success_callbacks = [] |
|
self.dynamic_async_success_callbacks.append(callback_class) |
|
elif dynamic_callbacks_type == "failure": |
|
if self.dynamic_async_failure_callbacks is None: |
|
self.dynamic_async_failure_callbacks = [] |
|
self.dynamic_async_failure_callbacks.append(callback_class) |
|
else: |
|
processed_list.append(callback) |
|
return processed_list |
|
|
|
def initialize_standard_callback_dynamic_params( |
|
self, kwargs: Optional[Dict] = None |
|
) -> StandardCallbackDynamicParams: |
|
""" |
|
Initialize the standard callback dynamic params from the kwargs |
|
|
|
checks if langfuse_secret_key, gcs_bucket_name in kwargs and sets the corresponding attributes in StandardCallbackDynamicParams |
|
""" |
|
return _initialize_standard_callback_dynamic_params(kwargs) |
|
|
|
def update_environment_variables( |
|
self, |
|
litellm_params: Dict, |
|
optional_params: Dict, |
|
model: Optional[str] = None, |
|
user: Optional[str] = None, |
|
**additional_params, |
|
): |
|
self.optional_params = optional_params |
|
if model is not None: |
|
self.model = model |
|
self.user = user |
|
self.litellm_params = { |
|
**self.litellm_params, |
|
**scrub_sensitive_keys_in_metadata(litellm_params), |
|
} |
|
self.logger_fn = litellm_params.get("logger_fn", None) |
|
verbose_logger.debug(f"self.optional_params: {self.optional_params}") |
|
|
|
self.model_call_details.update( |
|
{ |
|
"model": self.model, |
|
"messages": self.messages, |
|
"optional_params": self.optional_params, |
|
"litellm_params": self.litellm_params, |
|
"start_time": self.start_time, |
|
"stream": self.stream, |
|
"user": user, |
|
"call_type": str(self.call_type), |
|
"litellm_call_id": self.litellm_call_id, |
|
"completion_start_time": self.completion_start_time, |
|
"standard_callback_dynamic_params": self.standard_callback_dynamic_params, |
|
**self.optional_params, |
|
**additional_params, |
|
} |
|
) |
|
|
|
|
|
if "stream_options" in additional_params: |
|
self.stream_options = additional_params["stream_options"] |
|
|
|
if ( |
|
litellm_params.get("input_cost_per_token") is not None |
|
or litellm_params.get("input_cost_per_second") is not None |
|
or litellm_params.get("output_cost_per_token") is not None |
|
or litellm_params.get("output_cost_per_second") is not None |
|
): |
|
self.custom_pricing = True |
|
|
|
if "custom_llm_provider" in self.model_call_details: |
|
self.custom_llm_provider = self.model_call_details["custom_llm_provider"] |
|
|
|
def get_chat_completion_prompt( |
|
self, |
|
model: str, |
|
messages: List[AllMessageValues], |
|
non_default_params: dict, |
|
prompt_id: str, |
|
prompt_variables: Optional[dict], |
|
) -> Tuple[str, List[AllMessageValues], dict]: |
|
|
|
for ( |
|
custom_logger_compatible_callback |
|
) in litellm._known_custom_logger_compatible_callbacks: |
|
if model.startswith(custom_logger_compatible_callback): |
|
custom_logger = _init_custom_logger_compatible_class( |
|
logging_integration=custom_logger_compatible_callback, |
|
internal_usage_cache=None, |
|
llm_router=None, |
|
) |
|
|
|
if custom_logger is None: |
|
continue |
|
old_name = model |
|
|
|
model, messages, non_default_params = ( |
|
custom_logger.get_chat_completion_prompt( |
|
model=model, |
|
messages=messages, |
|
non_default_params=non_default_params, |
|
prompt_id=prompt_id, |
|
prompt_variables=prompt_variables, |
|
dynamic_callback_params=self.standard_callback_dynamic_params, |
|
) |
|
) |
|
self.model_call_details["prompt_integration"] = old_name.split("/")[0] |
|
self.messages = messages |
|
|
|
return model, messages, non_default_params |
|
|
|
def _pre_call(self, input, api_key, model=None, additional_args={}): |
|
""" |
|
Common helper function across the sync + async pre-call function |
|
""" |
|
|
|
self.model_call_details["input"] = input |
|
self.model_call_details["api_key"] = api_key |
|
self.model_call_details["additional_args"] = additional_args |
|
self.model_call_details["log_event_type"] = "pre_api_call" |
|
if ( |
|
model |
|
): |
|
self.model_call_details["model"] = model |
|
|
|
def pre_call(self, input, api_key, model=None, additional_args={}): |
|
|
|
litellm.error_logs["PRE_CALL"] = locals() |
|
try: |
|
self._pre_call( |
|
input=input, |
|
api_key=api_key, |
|
model=model, |
|
additional_args=additional_args, |
|
) |
|
|
|
|
|
self._print_llm_call_debugging_log( |
|
api_base=additional_args.get("api_base", ""), |
|
headers=additional_args.get("headers", {}), |
|
additional_args=additional_args, |
|
) |
|
|
|
if log_raw_request_response is True: |
|
_litellm_params = self.model_call_details.get("litellm_params", {}) |
|
_metadata = _litellm_params.get("metadata", {}) or {} |
|
try: |
|
|
|
if ( |
|
turn_off_message_logging is not None |
|
and turn_off_message_logging is True |
|
): |
|
_metadata["raw_request"] = ( |
|
"redacted by litellm. \ |
|
'litellm.turn_off_message_logging=True'" |
|
) |
|
else: |
|
curl_command = self._get_request_curl_command( |
|
api_base=additional_args.get("api_base", ""), |
|
headers=additional_args.get("headers", {}), |
|
additional_args=additional_args, |
|
data=additional_args.get("complete_input_dict", {}), |
|
) |
|
_metadata["raw_request"] = str(curl_command) |
|
except Exception as e: |
|
_metadata["raw_request"] = ( |
|
"Unable to Log \ |
|
raw request: {}".format( |
|
str(e) |
|
) |
|
) |
|
if self.logger_fn and callable(self.logger_fn): |
|
try: |
|
self.logger_fn( |
|
self.model_call_details |
|
) |
|
except Exception as e: |
|
verbose_logger.exception( |
|
"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while logging {}".format( |
|
str(e) |
|
) |
|
) |
|
|
|
self.model_call_details["api_call_start_time"] = datetime.datetime.now() |
|
|
|
callbacks = litellm.input_callback + (self.dynamic_input_callbacks or []) |
|
for callback in callbacks: |
|
try: |
|
if callback == "supabase" and supabaseClient is not None: |
|
verbose_logger.debug("reaches supabase for logging!") |
|
model = self.model_call_details["model"] |
|
messages = self.model_call_details["input"] |
|
verbose_logger.debug(f"supabaseClient: {supabaseClient}") |
|
supabaseClient.input_log_event( |
|
model=model, |
|
messages=messages, |
|
end_user=self.model_call_details.get("user", "default"), |
|
litellm_call_id=self.litellm_params["litellm_call_id"], |
|
print_verbose=print_verbose, |
|
) |
|
elif callback == "sentry" and add_breadcrumb: |
|
try: |
|
details_to_log = copy.deepcopy(self.model_call_details) |
|
except Exception: |
|
details_to_log = self.model_call_details |
|
if litellm.turn_off_message_logging: |
|
|
|
details_to_log.pop("messages", None) |
|
details_to_log.pop("input", None) |
|
details_to_log.pop("prompt", None) |
|
|
|
add_breadcrumb( |
|
category="litellm.llm_call", |
|
message=f"Model Call Details pre-call: {details_to_log}", |
|
level="info", |
|
) |
|
|
|
elif isinstance(callback, CustomLogger): |
|
callback.log_pre_api_call( |
|
model=self.model, |
|
messages=self.messages, |
|
kwargs=self.model_call_details, |
|
) |
|
elif ( |
|
callable(callback) and customLogger is not None |
|
): |
|
customLogger.log_input_event( |
|
model=self.model, |
|
messages=self.messages, |
|
kwargs=self.model_call_details, |
|
print_verbose=print_verbose, |
|
callback_func=callback, |
|
) |
|
except Exception as e: |
|
verbose_logger.exception( |
|
"litellm.Logging.pre_call(): Exception occured - {}".format( |
|
str(e) |
|
) |
|
) |
|
verbose_logger.debug( |
|
f"LiteLLM.Logging: is sentry capture exception initialized {capture_exception}" |
|
) |
|
if capture_exception: |
|
capture_exception(e) |
|
except Exception as e: |
|
verbose_logger.exception( |
|
"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while logging {}".format( |
|
str(e) |
|
) |
|
) |
|
verbose_logger.error( |
|
f"LiteLLM.Logging: is sentry capture exception initialized {capture_exception}" |
|
) |
|
if capture_exception: |
|
capture_exception(e) |
|
|
|
def _print_llm_call_debugging_log( |
|
self, |
|
api_base: str, |
|
headers: dict, |
|
additional_args: dict, |
|
): |
|
""" |
|
Internal debugging helper function |
|
|
|
Prints the RAW curl command sent from LiteLLM |
|
""" |
|
if _is_debugging_on(): |
|
if json_logs: |
|
masked_headers = self._get_masked_headers(headers) |
|
verbose_logger.debug( |
|
"POST Request Sent from LiteLLM", |
|
extra={"api_base": {api_base}, **masked_headers}, |
|
) |
|
else: |
|
headers = additional_args.get("headers", {}) |
|
if headers is None: |
|
headers = {} |
|
data = additional_args.get("complete_input_dict", {}) |
|
api_base = str(additional_args.get("api_base", "")) |
|
if "key=" in api_base: |
|
|
|
key_index = api_base.find("key=") + 4 |
|
|
|
masked_api_base = api_base[:key_index] + "*" * 5 + api_base[-4:] |
|
else: |
|
masked_api_base = api_base |
|
self.model_call_details["litellm_params"]["api_base"] = masked_api_base |
|
|
|
curl_command = self._get_request_curl_command( |
|
api_base=api_base, |
|
headers=headers, |
|
additional_args=additional_args, |
|
data=data, |
|
) |
|
verbose_logger.debug(f"\033[92m{curl_command}\033[0m\n") |
|
|
|
def _get_request_curl_command( |
|
self, api_base: str, headers: dict, additional_args: dict, data: dict |
|
) -> str: |
|
curl_command = "\n\nPOST Request Sent from LiteLLM:\n" |
|
curl_command += "curl -X POST \\\n" |
|
curl_command += f"{api_base} \\\n" |
|
masked_headers = self._get_masked_headers(headers) |
|
formatted_headers = " ".join( |
|
[f"-H '{k}: {v}'" for k, v in masked_headers.items()] |
|
) |
|
|
|
curl_command += ( |
|
f"{formatted_headers} \\\n" if formatted_headers.strip() != "" else "" |
|
) |
|
curl_command += f"-d '{str(data)}'\n" |
|
if additional_args.get("request_str", None) is not None: |
|
|
|
curl_command = "\nRequest Sent from LiteLLM:\n" |
|
curl_command += additional_args.get("request_str", None) |
|
elif api_base == "": |
|
curl_command = str(self.model_call_details) |
|
return curl_command |
|
|
|
def _get_masked_headers(self, headers: dict): |
|
""" |
|
Internal debugging helper function |
|
|
|
Masks the headers of the request sent from LiteLLM |
|
""" |
|
return { |
|
k: ( |
|
(v[:-44] + "*" * 44) |
|
if (isinstance(v, str) and len(v) > 44) |
|
else "*****" |
|
) |
|
for k, v in headers.items() |
|
} |
|
|
|
def post_call( |
|
self, original_response, input=None, api_key=None, additional_args={} |
|
): |
|
|
|
litellm.error_logs["POST_CALL"] = locals() |
|
if isinstance(original_response, dict): |
|
original_response = json.dumps(original_response) |
|
try: |
|
self.model_call_details["input"] = input |
|
self.model_call_details["api_key"] = api_key |
|
self.model_call_details["original_response"] = original_response |
|
self.model_call_details["additional_args"] = additional_args |
|
self.model_call_details["log_event_type"] = "post_api_call" |
|
|
|
if json_logs: |
|
verbose_logger.debug( |
|
"RAW RESPONSE:\n{}\n\n".format( |
|
self.model_call_details.get( |
|
"original_response", self.model_call_details |
|
) |
|
), |
|
) |
|
else: |
|
print_verbose( |
|
"RAW RESPONSE:\n{}\n\n".format( |
|
self.model_call_details.get( |
|
"original_response", self.model_call_details |
|
) |
|
) |
|
) |
|
if self.logger_fn and callable(self.logger_fn): |
|
try: |
|
self.logger_fn( |
|
self.model_call_details |
|
) |
|
except Exception as e: |
|
verbose_logger.exception( |
|
"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while logging {}".format( |
|
str(e) |
|
) |
|
) |
|
original_response = redact_message_input_output_from_logging( |
|
model_call_details=( |
|
self.model_call_details |
|
if hasattr(self, "model_call_details") |
|
else {} |
|
), |
|
result=original_response, |
|
) |
|
|
|
|
|
callbacks = litellm.input_callback + (self.dynamic_input_callbacks or []) |
|
for callback in callbacks: |
|
try: |
|
if callback == "sentry" and add_breadcrumb: |
|
verbose_logger.debug("reaches sentry breadcrumbing") |
|
try: |
|
details_to_log = copy.deepcopy(self.model_call_details) |
|
except Exception: |
|
details_to_log = self.model_call_details |
|
if litellm.turn_off_message_logging: |
|
|
|
details_to_log.pop("messages", None) |
|
details_to_log.pop("input", None) |
|
details_to_log.pop("prompt", None) |
|
|
|
add_breadcrumb( |
|
category="litellm.llm_call", |
|
message=f"Model Call Details post-call: {details_to_log}", |
|
level="info", |
|
) |
|
elif isinstance(callback, CustomLogger): |
|
callback.log_post_api_call( |
|
kwargs=self.model_call_details, |
|
response_obj=None, |
|
start_time=self.start_time, |
|
end_time=None, |
|
) |
|
except Exception as e: |
|
verbose_logger.exception( |
|
"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while post-call logging with integrations {}".format( |
|
str(e) |
|
) |
|
) |
|
verbose_logger.debug( |
|
f"LiteLLM.Logging: is sentry capture exception initialized {capture_exception}" |
|
) |
|
if capture_exception: |
|
capture_exception(e) |
|
except Exception as e: |
|
verbose_logger.exception( |
|
"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while logging {}".format( |
|
str(e) |
|
) |
|
) |
|
|
|
def get_response_ms(self) -> float: |
|
return ( |
|
self.model_call_details.get("end_time", datetime.datetime.now()) |
|
- self.model_call_details.get("start_time", datetime.datetime.now()) |
|
).total_seconds() * 1000 |
|
|
|
def _response_cost_calculator( |
|
self, |
|
result: Union[ |
|
ModelResponse, |
|
ModelResponseStream, |
|
EmbeddingResponse, |
|
ImageResponse, |
|
TranscriptionResponse, |
|
TextCompletionResponse, |
|
HttpxBinaryResponseContent, |
|
RerankResponse, |
|
Batch, |
|
FineTuningJob, |
|
], |
|
cache_hit: Optional[bool] = None, |
|
) -> Optional[float]: |
|
""" |
|
Calculate response cost using result + logging object variables. |
|
|
|
used for consistent cost calculation across response headers + logging integrations. |
|
""" |
|
|
|
|
|
custom_pricing = use_custom_pricing_for_model( |
|
litellm_params=( |
|
self.litellm_params if hasattr(self, "litellm_params") else None |
|
) |
|
) |
|
|
|
prompt = "" |
|
_input = self.model_call_details.get("input", None) |
|
if _input is not None and isinstance(_input, str): |
|
prompt = _input |
|
|
|
if cache_hit is None: |
|
cache_hit = self.model_call_details.get("cache_hit", False) |
|
|
|
try: |
|
response_cost_calculator_kwargs = { |
|
"response_object": result, |
|
"model": self.model, |
|
"cache_hit": cache_hit, |
|
"custom_llm_provider": self.model_call_details.get( |
|
"custom_llm_provider", None |
|
), |
|
"base_model": _get_base_model_from_metadata( |
|
model_call_details=self.model_call_details |
|
), |
|
"call_type": self.call_type, |
|
"optional_params": self.optional_params, |
|
"custom_pricing": custom_pricing, |
|
"prompt": prompt, |
|
} |
|
except Exception as e: |
|
debug_info = StandardLoggingModelCostFailureDebugInformation( |
|
error_str=str(e), |
|
traceback_str=traceback.format_exc(), |
|
) |
|
verbose_logger.debug( |
|
f"response_cost_failure_debug_information: {debug_info}" |
|
) |
|
self.model_call_details["response_cost_failure_debug_information"] = ( |
|
debug_info |
|
) |
|
return None |
|
|
|
try: |
|
response_cost = litellm.response_cost_calculator( |
|
**response_cost_calculator_kwargs |
|
) |
|
verbose_logger.debug(f"response_cost: {response_cost}") |
|
return response_cost |
|
except Exception as e: |
|
debug_info = StandardLoggingModelCostFailureDebugInformation( |
|
error_str=str(e), |
|
traceback_str=_get_traceback_str_for_error(str(e)), |
|
model=response_cost_calculator_kwargs["model"], |
|
cache_hit=response_cost_calculator_kwargs["cache_hit"], |
|
custom_llm_provider=response_cost_calculator_kwargs[ |
|
"custom_llm_provider" |
|
], |
|
base_model=response_cost_calculator_kwargs["base_model"], |
|
call_type=response_cost_calculator_kwargs["call_type"], |
|
custom_pricing=response_cost_calculator_kwargs["custom_pricing"], |
|
) |
|
verbose_logger.debug( |
|
f"response_cost_failure_debug_information: {debug_info}" |
|
) |
|
self.model_call_details["response_cost_failure_debug_information"] = ( |
|
debug_info |
|
) |
|
|
|
return None |
|
|
|
def should_run_callback( |
|
self, callback: litellm.CALLBACK_TYPES, litellm_params: dict, event_hook: str |
|
) -> bool: |
|
|
|
if litellm.global_disable_no_log_param: |
|
return True |
|
|
|
if litellm_params.get("no-log", False) is True: |
|
|
|
|
|
if not ( |
|
isinstance(callback, CustomLogger) |
|
and "_PROXY_" in callback.__class__.__name__ |
|
): |
|
verbose_logger.debug( |
|
f"no-log request, skipping logging for {event_hook} event" |
|
) |
|
return False |
|
return True |
|
|
|
def _success_handler_helper_fn( |
|
self, |
|
result=None, |
|
start_time=None, |
|
end_time=None, |
|
cache_hit=None, |
|
standard_logging_object: Optional[StandardLoggingPayload] = None, |
|
): |
|
try: |
|
if start_time is None: |
|
start_time = self.start_time |
|
if end_time is None: |
|
end_time = datetime.datetime.now() |
|
if self.completion_start_time is None: |
|
self.completion_start_time = end_time |
|
self.model_call_details["completion_start_time"] = ( |
|
self.completion_start_time |
|
) |
|
self.model_call_details["log_event_type"] = "successful_api_call" |
|
self.model_call_details["end_time"] = end_time |
|
self.model_call_details["cache_hit"] = cache_hit |
|
|
|
|
|
if ( |
|
standard_logging_object is None |
|
and result is not None |
|
and self.stream is not True |
|
): |
|
if ( |
|
isinstance(result, ModelResponse) |
|
or isinstance(result, ModelResponseStream) |
|
or isinstance(result, EmbeddingResponse) |
|
or isinstance(result, ImageResponse) |
|
or isinstance(result, TranscriptionResponse) |
|
or isinstance(result, TextCompletionResponse) |
|
or isinstance(result, HttpxBinaryResponseContent) |
|
or isinstance(result, RerankResponse) |
|
or isinstance(result, Batch) |
|
or isinstance(result, FineTuningJob) |
|
): |
|
|
|
hidden_params = getattr(result, "_hidden_params", {}) |
|
if hidden_params: |
|
|
|
if self.model_call_details.get("litellm_params") is not None: |
|
self.model_call_details["litellm_params"].setdefault( |
|
"metadata", {} |
|
) |
|
if ( |
|
self.model_call_details["litellm_params"]["metadata"] |
|
is None |
|
): |
|
self.model_call_details["litellm_params"][ |
|
"metadata" |
|
] = {} |
|
|
|
self.model_call_details["litellm_params"]["metadata"][ |
|
"hidden_params" |
|
] = getattr( |
|
result, "_hidden_params", {} |
|
) |
|
|
|
if "response_cost" in hidden_params: |
|
self.model_call_details["response_cost"] = hidden_params[ |
|
"response_cost" |
|
] |
|
else: |
|
self.model_call_details["response_cost"] = ( |
|
self._response_cost_calculator(result=result) |
|
) |
|
|
|
|
|
self.model_call_details["standard_logging_object"] = ( |
|
get_standard_logging_object_payload( |
|
kwargs=self.model_call_details, |
|
init_response_obj=result, |
|
start_time=start_time, |
|
end_time=end_time, |
|
logging_obj=self, |
|
status="success", |
|
) |
|
) |
|
elif isinstance(result, dict): |
|
|
|
self.model_call_details["standard_logging_object"] = ( |
|
get_standard_logging_object_payload( |
|
kwargs=self.model_call_details, |
|
init_response_obj=result, |
|
start_time=start_time, |
|
end_time=end_time, |
|
logging_obj=self, |
|
status="success", |
|
) |
|
) |
|
elif standard_logging_object is not None: |
|
self.model_call_details["standard_logging_object"] = ( |
|
standard_logging_object |
|
) |
|
else: |
|
self.model_call_details["response_cost"] = None |
|
|
|
if ( |
|
litellm.max_budget |
|
and self.stream is False |
|
and result is not None |
|
and isinstance(result, dict) |
|
and "content" in result |
|
): |
|
time_diff = (end_time - start_time).total_seconds() |
|
float_diff = float(time_diff) |
|
litellm._current_cost += litellm.completion_cost( |
|
model=self.model, |
|
prompt="", |
|
completion=getattr(result, "content", ""), |
|
total_time=float_diff, |
|
) |
|
|
|
return start_time, end_time, result |
|
except Exception as e: |
|
raise Exception(f"[Non-Blocking] LiteLLM.Success_Call Error: {str(e)}") |
|
|
|
def success_handler( |
|
self, result=None, start_time=None, end_time=None, cache_hit=None, **kwargs |
|
): |
|
verbose_logger.debug( |
|
f"Logging Details LiteLLM-Success Call: Cache_hit={cache_hit}" |
|
) |
|
start_time, end_time, result = self._success_handler_helper_fn( |
|
start_time=start_time, |
|
end_time=end_time, |
|
result=result, |
|
cache_hit=cache_hit, |
|
standard_logging_object=kwargs.get("standard_logging_object", None), |
|
) |
|
try: |
|
|
|
|
|
complete_streaming_response: Optional[ |
|
Union[ModelResponse, TextCompletionResponse] |
|
] = None |
|
if "complete_streaming_response" in self.model_call_details: |
|
return |
|
complete_streaming_response = self._get_assembled_streaming_response( |
|
result=result, |
|
start_time=start_time, |
|
end_time=end_time, |
|
is_async=False, |
|
streaming_chunks=self.sync_streaming_chunks, |
|
) |
|
if complete_streaming_response is not None: |
|
verbose_logger.debug( |
|
"Logging Details LiteLLM-Success Call streaming complete" |
|
) |
|
self.model_call_details["complete_streaming_response"] = ( |
|
complete_streaming_response |
|
) |
|
self.model_call_details["response_cost"] = ( |
|
self._response_cost_calculator(result=complete_streaming_response) |
|
) |
|
|
|
self.model_call_details["standard_logging_object"] = ( |
|
get_standard_logging_object_payload( |
|
kwargs=self.model_call_details, |
|
init_response_obj=complete_streaming_response, |
|
start_time=start_time, |
|
end_time=end_time, |
|
logging_obj=self, |
|
status="success", |
|
) |
|
) |
|
callbacks = self.get_combined_callback_list( |
|
dynamic_success_callbacks=self.dynamic_success_callbacks, |
|
global_callbacks=litellm.success_callback, |
|
) |
|
|
|
|
|
result = redact_message_input_output_from_logging( |
|
model_call_details=( |
|
self.model_call_details |
|
if hasattr(self, "model_call_details") |
|
else {} |
|
), |
|
result=result, |
|
) |
|
|
|
for callback in callbacks: |
|
if isinstance(callback, CustomLogger): |
|
|
|
self.model_call_details, result = callback.logging_hook( |
|
kwargs=self.model_call_details, |
|
result=result, |
|
call_type=self.call_type, |
|
) |
|
|
|
for callback in callbacks: |
|
try: |
|
litellm_params = self.model_call_details.get("litellm_params", {}) |
|
should_run = self.should_run_callback( |
|
callback=callback, |
|
litellm_params=litellm_params, |
|
event_hook="success_handler", |
|
) |
|
if not should_run: |
|
continue |
|
if callback == "promptlayer" and promptLayerLogger is not None: |
|
print_verbose("reaches promptlayer for logging!") |
|
promptLayerLogger.log_event( |
|
kwargs=self.model_call_details, |
|
response_obj=result, |
|
start_time=start_time, |
|
end_time=end_time, |
|
print_verbose=print_verbose, |
|
) |
|
if callback == "supabase" and supabaseClient is not None: |
|
print_verbose("reaches supabase for logging!") |
|
kwargs = self.model_call_details |
|
|
|
|
|
if self.stream: |
|
if "complete_streaming_response" not in kwargs: |
|
continue |
|
else: |
|
print_verbose("reaches supabase for streaming logging!") |
|
result = kwargs["complete_streaming_response"] |
|
|
|
model = kwargs["model"] |
|
messages = kwargs["messages"] |
|
optional_params = kwargs.get("optional_params", {}) |
|
litellm_params = kwargs.get("litellm_params", {}) |
|
supabaseClient.log_event( |
|
model=model, |
|
messages=messages, |
|
end_user=optional_params.get("user", "default"), |
|
response_obj=result, |
|
start_time=start_time, |
|
end_time=end_time, |
|
litellm_call_id=litellm_params.get( |
|
"litellm_call_id", str(uuid.uuid4()) |
|
), |
|
print_verbose=print_verbose, |
|
) |
|
if callback == "wandb" and weightsBiasesLogger is not None: |
|
print_verbose("reaches wandb for logging!") |
|
weightsBiasesLogger.log_event( |
|
kwargs=self.model_call_details, |
|
response_obj=result, |
|
start_time=start_time, |
|
end_time=end_time, |
|
print_verbose=print_verbose, |
|
) |
|
if callback == "logfire" and logfireLogger is not None: |
|
verbose_logger.debug("reaches logfire for success logging!") |
|
kwargs = {} |
|
for k, v in self.model_call_details.items(): |
|
if ( |
|
k != "original_response" |
|
): |
|
kwargs[k] = v |
|
|
|
|
|
if self.stream: |
|
if "complete_streaming_response" not in kwargs: |
|
continue |
|
else: |
|
print_verbose("reaches logfire for streaming logging!") |
|
result = kwargs["complete_streaming_response"] |
|
|
|
logfireLogger.log_event( |
|
kwargs=self.model_call_details, |
|
response_obj=result, |
|
start_time=start_time, |
|
end_time=end_time, |
|
print_verbose=print_verbose, |
|
level=LogfireLevel.INFO.value, |
|
) |
|
|
|
if callback == "lunary" and lunaryLogger is not None: |
|
print_verbose("reaches lunary for logging!") |
|
model = self.model |
|
kwargs = self.model_call_details |
|
|
|
input = kwargs.get("messages", kwargs.get("input", None)) |
|
|
|
type = ( |
|
"embed" |
|
if self.call_type == CallTypes.embedding.value |
|
else "llm" |
|
) |
|
|
|
|
|
if self.stream: |
|
if "complete_streaming_response" not in kwargs: |
|
continue |
|
else: |
|
result = kwargs["complete_streaming_response"] |
|
|
|
lunaryLogger.log_event( |
|
type=type, |
|
kwargs=kwargs, |
|
event="end", |
|
model=model, |
|
input=input, |
|
user_id=kwargs.get("user", None), |
|
|
|
extra=kwargs.get("optional_params", {}), |
|
response_obj=result, |
|
start_time=start_time, |
|
end_time=end_time, |
|
run_id=self.litellm_call_id, |
|
print_verbose=print_verbose, |
|
) |
|
if callback == "helicone" and heliconeLogger is not None: |
|
print_verbose("reaches helicone for logging!") |
|
model = self.model |
|
messages = self.model_call_details["input"] |
|
kwargs = self.model_call_details |
|
|
|
|
|
if self.stream: |
|
if "complete_streaming_response" not in kwargs: |
|
continue |
|
else: |
|
print_verbose("reaches helicone for streaming logging!") |
|
result = kwargs["complete_streaming_response"] |
|
|
|
heliconeLogger.log_success( |
|
model=model, |
|
messages=messages, |
|
response_obj=result, |
|
start_time=start_time, |
|
end_time=end_time, |
|
print_verbose=print_verbose, |
|
kwargs=kwargs, |
|
) |
|
if callback == "langfuse": |
|
global langFuseLogger |
|
print_verbose("reaches langfuse for success logging!") |
|
kwargs = {} |
|
for k, v in self.model_call_details.items(): |
|
if ( |
|
k != "original_response" |
|
): |
|
kwargs[k] = v |
|
|
|
if self.stream: |
|
verbose_logger.debug( |
|
f"is complete_streaming_response in kwargs: {kwargs.get('complete_streaming_response', None)}" |
|
) |
|
if complete_streaming_response is None: |
|
continue |
|
else: |
|
print_verbose("reaches langfuse for streaming logging!") |
|
result = kwargs["complete_streaming_response"] |
|
|
|
langfuse_logger_to_use = LangFuseHandler.get_langfuse_logger_for_request( |
|
globalLangfuseLogger=langFuseLogger, |
|
standard_callback_dynamic_params=self.standard_callback_dynamic_params, |
|
in_memory_dynamic_logger_cache=in_memory_dynamic_logger_cache, |
|
) |
|
if langfuse_logger_to_use is not None: |
|
_response = langfuse_logger_to_use._old_log_event( |
|
kwargs=kwargs, |
|
response_obj=result, |
|
start_time=start_time, |
|
end_time=end_time, |
|
user_id=kwargs.get("user", None), |
|
print_verbose=print_verbose, |
|
) |
|
if _response is not None and isinstance(_response, dict): |
|
_trace_id = _response.get("trace_id", None) |
|
if _trace_id is not None: |
|
in_memory_trace_id_cache.set_cache( |
|
litellm_call_id=self.litellm_call_id, |
|
service_name="langfuse", |
|
trace_id=_trace_id, |
|
) |
|
if callback == "generic": |
|
global genericAPILogger |
|
verbose_logger.debug("reaches langfuse for success logging!") |
|
kwargs = {} |
|
for k, v in self.model_call_details.items(): |
|
if ( |
|
k != "original_response" |
|
): |
|
kwargs[k] = v |
|
|
|
if self.stream: |
|
verbose_logger.debug( |
|
f"is complete_streaming_response in kwargs: {kwargs.get('complete_streaming_response', None)}" |
|
) |
|
if complete_streaming_response is None: |
|
continue |
|
else: |
|
print_verbose("reaches langfuse for streaming logging!") |
|
result = kwargs["complete_streaming_response"] |
|
if genericAPILogger is None: |
|
genericAPILogger = GenericAPILogger() |
|
genericAPILogger.log_event( |
|
kwargs=kwargs, |
|
response_obj=result, |
|
start_time=start_time, |
|
end_time=end_time, |
|
user_id=kwargs.get("user", None), |
|
print_verbose=print_verbose, |
|
) |
|
if callback == "greenscale" and greenscaleLogger is not None: |
|
kwargs = {} |
|
for k, v in self.model_call_details.items(): |
|
if ( |
|
k != "original_response" |
|
): |
|
kwargs[k] = v |
|
|
|
if self.stream: |
|
verbose_logger.debug( |
|
f"is complete_streaming_response in kwargs: {kwargs.get('complete_streaming_response', None)}" |
|
) |
|
if complete_streaming_response is None: |
|
continue |
|
else: |
|
print_verbose( |
|
"reaches greenscale for streaming logging!" |
|
) |
|
result = kwargs["complete_streaming_response"] |
|
|
|
greenscaleLogger.log_event( |
|
kwargs=kwargs, |
|
response_obj=result, |
|
start_time=start_time, |
|
end_time=end_time, |
|
print_verbose=print_verbose, |
|
) |
|
if callback == "athina" and athinaLogger is not None: |
|
deep_copy = {} |
|
for k, v in self.model_call_details.items(): |
|
deep_copy[k] = v |
|
athinaLogger.log_event( |
|
kwargs=deep_copy, |
|
response_obj=result, |
|
start_time=start_time, |
|
end_time=end_time, |
|
print_verbose=print_verbose, |
|
) |
|
if callback == "traceloop": |
|
deep_copy = {} |
|
for k, v in self.model_call_details.items(): |
|
if k != "original_response": |
|
deep_copy[k] = v |
|
traceloopLogger.log_event( |
|
kwargs=deep_copy, |
|
response_obj=result, |
|
start_time=start_time, |
|
end_time=end_time, |
|
user_id=kwargs.get("user", None), |
|
print_verbose=print_verbose, |
|
) |
|
if callback == "s3": |
|
global s3Logger |
|
if s3Logger is None: |
|
s3Logger = S3Logger() |
|
if self.stream: |
|
if "complete_streaming_response" in self.model_call_details: |
|
print_verbose( |
|
"S3Logger Logger: Got Stream Event - Completed Stream Response" |
|
) |
|
s3Logger.log_event( |
|
kwargs=self.model_call_details, |
|
response_obj=self.model_call_details[ |
|
"complete_streaming_response" |
|
], |
|
start_time=start_time, |
|
end_time=end_time, |
|
print_verbose=print_verbose, |
|
) |
|
else: |
|
print_verbose( |
|
"S3Logger Logger: Got Stream Event - No complete stream response as yet" |
|
) |
|
else: |
|
s3Logger.log_event( |
|
kwargs=self.model_call_details, |
|
response_obj=result, |
|
start_time=start_time, |
|
end_time=end_time, |
|
print_verbose=print_verbose, |
|
) |
|
|
|
if ( |
|
callback == "openmeter" |
|
and self.model_call_details.get("litellm_params", {}).get( |
|
"acompletion", False |
|
) |
|
is not True |
|
and self.model_call_details.get("litellm_params", {}).get( |
|
"aembedding", False |
|
) |
|
is not True |
|
and self.model_call_details.get("litellm_params", {}).get( |
|
"aimage_generation", False |
|
) |
|
is not True |
|
and self.model_call_details.get("litellm_params", {}).get( |
|
"atranscription", False |
|
) |
|
is not True |
|
): |
|
global openMeterLogger |
|
if openMeterLogger is None: |
|
print_verbose("Instantiates openmeter client") |
|
openMeterLogger = OpenMeterLogger() |
|
if self.stream and complete_streaming_response is None: |
|
openMeterLogger.log_stream_event( |
|
kwargs=self.model_call_details, |
|
response_obj=result, |
|
start_time=start_time, |
|
end_time=end_time, |
|
) |
|
else: |
|
if self.stream and complete_streaming_response: |
|
self.model_call_details["complete_response"] = ( |
|
self.model_call_details.get( |
|
"complete_streaming_response", {} |
|
) |
|
) |
|
result = self.model_call_details["complete_response"] |
|
openMeterLogger.log_success_event( |
|
kwargs=self.model_call_details, |
|
response_obj=result, |
|
start_time=start_time, |
|
end_time=end_time, |
|
) |
|
|
|
if ( |
|
isinstance(callback, CustomLogger) |
|
and self.model_call_details.get("litellm_params", {}).get( |
|
"acompletion", False |
|
) |
|
is not True |
|
and self.model_call_details.get("litellm_params", {}).get( |
|
"aembedding", False |
|
) |
|
is not True |
|
and self.model_call_details.get("litellm_params", {}).get( |
|
"aimage_generation", False |
|
) |
|
is not True |
|
and self.model_call_details.get("litellm_params", {}).get( |
|
"atranscription", False |
|
) |
|
is not True |
|
and self.call_type |
|
!= CallTypes.pass_through.value |
|
): |
|
if self.stream and complete_streaming_response is None: |
|
callback.log_stream_event( |
|
kwargs=self.model_call_details, |
|
response_obj=result, |
|
start_time=start_time, |
|
end_time=end_time, |
|
) |
|
else: |
|
if self.stream and complete_streaming_response: |
|
self.model_call_details["complete_response"] = ( |
|
self.model_call_details.get( |
|
"complete_streaming_response", {} |
|
) |
|
) |
|
result = self.model_call_details["complete_response"] |
|
|
|
callback.log_success_event( |
|
kwargs=self.model_call_details, |
|
response_obj=result, |
|
start_time=start_time, |
|
end_time=end_time, |
|
) |
|
if ( |
|
callable(callback) is True |
|
and self.model_call_details.get("litellm_params", {}).get( |
|
"acompletion", False |
|
) |
|
is not True |
|
and self.model_call_details.get("litellm_params", {}).get( |
|
"aembedding", False |
|
) |
|
is not True |
|
and self.model_call_details.get("litellm_params", {}).get( |
|
"aimage_generation", False |
|
) |
|
is not True |
|
and self.model_call_details.get("litellm_params", {}).get( |
|
"atranscription", False |
|
) |
|
is not True |
|
and customLogger is not None |
|
): |
|
print_verbose( |
|
"success callbacks: Running Custom Callback Function - {}".format( |
|
callback |
|
) |
|
) |
|
|
|
customLogger.log_event( |
|
kwargs=self.model_call_details, |
|
response_obj=result, |
|
start_time=start_time, |
|
end_time=end_time, |
|
print_verbose=print_verbose, |
|
callback_func=callback, |
|
) |
|
|
|
except Exception as e: |
|
print_verbose( |
|
f"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while success logging with integrations {traceback.format_exc()}" |
|
) |
|
print_verbose( |
|
f"LiteLLM.Logging: is sentry capture exception initialized {capture_exception}" |
|
) |
|
if capture_exception: |
|
capture_exception(e) |
|
except Exception as e: |
|
verbose_logger.exception( |
|
"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while success logging {}".format( |
|
str(e) |
|
), |
|
) |
|
|
|
async def async_success_handler( |
|
self, result=None, start_time=None, end_time=None, cache_hit=None, **kwargs |
|
): |
|
""" |
|
Implementing async callbacks, to handle asyncio event loop issues when custom integrations need to use async functions. |
|
""" |
|
print_verbose( |
|
"Logging Details LiteLLM-Async Success Call, cache_hit={}".format(cache_hit) |
|
) |
|
start_time, end_time, result = self._success_handler_helper_fn( |
|
start_time=start_time, |
|
end_time=end_time, |
|
result=result, |
|
cache_hit=cache_hit, |
|
standard_logging_object=kwargs.get("standard_logging_object", None), |
|
) |
|
|
|
if "async_complete_streaming_response" in self.model_call_details: |
|
return |
|
complete_streaming_response: Optional[ |
|
Union[ModelResponse, TextCompletionResponse] |
|
] = self._get_assembled_streaming_response( |
|
result=result, |
|
start_time=start_time, |
|
end_time=end_time, |
|
is_async=True, |
|
streaming_chunks=self.streaming_chunks, |
|
) |
|
|
|
if complete_streaming_response is not None: |
|
print_verbose("Async success callbacks: Got a complete streaming response") |
|
|
|
self.model_call_details["async_complete_streaming_response"] = ( |
|
complete_streaming_response |
|
) |
|
try: |
|
if self.model_call_details.get("cache_hit", False) is True: |
|
self.model_call_details["response_cost"] = 0.0 |
|
else: |
|
|
|
_get_base_model_from_metadata( |
|
model_call_details=self.model_call_details |
|
) |
|
|
|
self.model_call_details["response_cost"] = ( |
|
self._response_cost_calculator( |
|
result=complete_streaming_response |
|
) |
|
) |
|
|
|
verbose_logger.debug( |
|
f"Model={self.model}; cost={self.model_call_details['response_cost']}" |
|
) |
|
except litellm.NotFoundError: |
|
verbose_logger.warning( |
|
f"Model={self.model} not found in completion cost map. Setting 'response_cost' to None" |
|
) |
|
self.model_call_details["response_cost"] = None |
|
|
|
|
|
self.model_call_details["standard_logging_object"] = ( |
|
get_standard_logging_object_payload( |
|
kwargs=self.model_call_details, |
|
init_response_obj=complete_streaming_response, |
|
start_time=start_time, |
|
end_time=end_time, |
|
logging_obj=self, |
|
status="success", |
|
) |
|
) |
|
callbacks = self.get_combined_callback_list( |
|
dynamic_success_callbacks=self.dynamic_async_success_callbacks, |
|
global_callbacks=litellm._async_success_callback, |
|
) |
|
|
|
result = redact_message_input_output_from_logging( |
|
model_call_details=( |
|
self.model_call_details if hasattr(self, "model_call_details") else {} |
|
), |
|
result=result, |
|
) |
|
|
|
|
|
|
|
for callback in callbacks: |
|
if isinstance(callback, CustomGuardrail): |
|
from litellm.types.guardrails import GuardrailEventHooks |
|
|
|
if ( |
|
callback.should_run_guardrail( |
|
data=self.model_call_details, |
|
event_type=GuardrailEventHooks.logging_only, |
|
) |
|
is not True |
|
): |
|
continue |
|
|
|
self.model_call_details, result = await callback.async_logging_hook( |
|
kwargs=self.model_call_details, |
|
result=result, |
|
call_type=self.call_type, |
|
) |
|
elif isinstance(callback, CustomLogger): |
|
result = redact_message_input_output_from_custom_logger( |
|
result=result, litellm_logging_obj=self, custom_logger=callback |
|
) |
|
self.model_call_details, result = await callback.async_logging_hook( |
|
kwargs=self.model_call_details, |
|
result=result, |
|
call_type=self.call_type, |
|
) |
|
|
|
for callback in callbacks: |
|
|
|
litellm_params = self.model_call_details.get("litellm_params", {}) |
|
should_run = self.should_run_callback( |
|
callback=callback, |
|
litellm_params=litellm_params, |
|
event_hook="async_success_handler", |
|
) |
|
if not should_run: |
|
continue |
|
try: |
|
if callback == "openmeter" and openMeterLogger is not None: |
|
if self.stream is True: |
|
if ( |
|
"async_complete_streaming_response" |
|
in self.model_call_details |
|
): |
|
await openMeterLogger.async_log_success_event( |
|
kwargs=self.model_call_details, |
|
response_obj=self.model_call_details[ |
|
"async_complete_streaming_response" |
|
], |
|
start_time=start_time, |
|
end_time=end_time, |
|
) |
|
else: |
|
await openMeterLogger.async_log_stream_event( |
|
kwargs=self.model_call_details, |
|
response_obj=result, |
|
start_time=start_time, |
|
end_time=end_time, |
|
) |
|
else: |
|
await openMeterLogger.async_log_success_event( |
|
kwargs=self.model_call_details, |
|
response_obj=result, |
|
start_time=start_time, |
|
end_time=end_time, |
|
) |
|
if isinstance(callback, CustomLogger): |
|
if self.stream is True: |
|
if ( |
|
"async_complete_streaming_response" |
|
in self.model_call_details |
|
): |
|
await callback.async_log_success_event( |
|
kwargs=self.model_call_details, |
|
response_obj=self.model_call_details[ |
|
"async_complete_streaming_response" |
|
], |
|
start_time=start_time, |
|
end_time=end_time, |
|
) |
|
else: |
|
await callback.async_log_stream_event( |
|
kwargs=self.model_call_details, |
|
response_obj=result, |
|
start_time=start_time, |
|
end_time=end_time, |
|
) |
|
else: |
|
await callback.async_log_success_event( |
|
kwargs=self.model_call_details, |
|
response_obj=result, |
|
start_time=start_time, |
|
end_time=end_time, |
|
) |
|
if callable(callback): |
|
global customLogger |
|
if customLogger is None: |
|
customLogger = CustomLogger() |
|
if self.stream: |
|
if ( |
|
"async_complete_streaming_response" |
|
in self.model_call_details |
|
): |
|
await customLogger.async_log_event( |
|
kwargs=self.model_call_details, |
|
response_obj=self.model_call_details[ |
|
"async_complete_streaming_response" |
|
], |
|
start_time=start_time, |
|
end_time=end_time, |
|
print_verbose=print_verbose, |
|
callback_func=callback, |
|
) |
|
else: |
|
await customLogger.async_log_event( |
|
kwargs=self.model_call_details, |
|
response_obj=result, |
|
start_time=start_time, |
|
end_time=end_time, |
|
print_verbose=print_verbose, |
|
callback_func=callback, |
|
) |
|
if callback == "dynamodb": |
|
global dynamoLogger |
|
if dynamoLogger is None: |
|
dynamoLogger = DyanmoDBLogger() |
|
if self.stream: |
|
if ( |
|
"async_complete_streaming_response" |
|
in self.model_call_details |
|
): |
|
print_verbose( |
|
"DynamoDB Logger: Got Stream Event - Completed Stream Response" |
|
) |
|
await dynamoLogger._async_log_event( |
|
kwargs=self.model_call_details, |
|
response_obj=self.model_call_details[ |
|
"async_complete_streaming_response" |
|
], |
|
start_time=start_time, |
|
end_time=end_time, |
|
print_verbose=print_verbose, |
|
) |
|
else: |
|
print_verbose( |
|
"DynamoDB Logger: Got Stream Event - No complete stream response as yet" |
|
) |
|
else: |
|
await dynamoLogger._async_log_event( |
|
kwargs=self.model_call_details, |
|
response_obj=result, |
|
start_time=start_time, |
|
end_time=end_time, |
|
print_verbose=print_verbose, |
|
) |
|
except Exception: |
|
verbose_logger.error( |
|
f"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while success logging {traceback.format_exc()}" |
|
) |
|
pass |
|
|
|
def _failure_handler_helper_fn( |
|
self, exception, traceback_exception, start_time=None, end_time=None |
|
): |
|
if start_time is None: |
|
start_time = self.start_time |
|
if end_time is None: |
|
end_time = datetime.datetime.now() |
|
|
|
|
|
if not hasattr(self, "model_call_details"): |
|
self.model_call_details = {} |
|
|
|
self.model_call_details["log_event_type"] = "failed_api_call" |
|
self.model_call_details["exception"] = exception |
|
self.model_call_details["traceback_exception"] = traceback_exception |
|
self.model_call_details["end_time"] = end_time |
|
self.model_call_details.setdefault("original_response", None) |
|
self.model_call_details["response_cost"] = 0 |
|
|
|
if hasattr(exception, "headers") and isinstance(exception.headers, dict): |
|
self.model_call_details.setdefault("litellm_params", {}) |
|
metadata = ( |
|
self.model_call_details["litellm_params"].get("metadata", {}) or {} |
|
) |
|
metadata.update(exception.headers) |
|
|
|
|
|
|
|
self.model_call_details["standard_logging_object"] = ( |
|
get_standard_logging_object_payload( |
|
kwargs=self.model_call_details, |
|
init_response_obj={}, |
|
start_time=start_time, |
|
end_time=end_time, |
|
logging_obj=self, |
|
status="failure", |
|
error_str=str(exception), |
|
original_exception=exception, |
|
) |
|
) |
|
return start_time, end_time |
|
|
|
async def special_failure_handlers(self, exception: Exception): |
|
""" |
|
Custom events, emitted for specific failures. |
|
|
|
Currently just for router model group rate limit error |
|
""" |
|
from litellm.types.router import RouterErrors |
|
|
|
litellm_params: dict = self.model_call_details.get("litellm_params") or {} |
|
metadata = litellm_params.get("metadata") or {} |
|
|
|
|
|
is_base_case = False |
|
if metadata.get("model_group_size") is not None: |
|
model_group_size = metadata.get("model_group_size") |
|
if isinstance(model_group_size, int) and model_group_size == 1: |
|
is_base_case = True |
|
|
|
if ( |
|
RouterErrors.no_deployments_available.value not in str(exception) |
|
and is_base_case is False |
|
): |
|
return |
|
|
|
|
|
|
|
model_group = metadata.get("model_group") or None |
|
for callback in litellm._async_failure_callback: |
|
if isinstance(callback, CustomLogger): |
|
await callback.log_model_group_rate_limit_error( |
|
exception=exception, |
|
original_model_group=model_group, |
|
kwargs=self.model_call_details, |
|
) |
|
|
|
def failure_handler( |
|
self, exception, traceback_exception, start_time=None, end_time=None |
|
): |
|
verbose_logger.debug( |
|
f"Logging Details LiteLLM-Failure Call: {litellm.failure_callback}" |
|
) |
|
try: |
|
start_time, end_time = self._failure_handler_helper_fn( |
|
exception=exception, |
|
traceback_exception=traceback_exception, |
|
start_time=start_time, |
|
end_time=end_time, |
|
) |
|
callbacks = self.get_combined_callback_list( |
|
dynamic_success_callbacks=self.dynamic_failure_callbacks, |
|
global_callbacks=litellm.failure_callback, |
|
) |
|
|
|
result = None |
|
|
|
result = redact_message_input_output_from_logging( |
|
model_call_details=( |
|
self.model_call_details |
|
if hasattr(self, "model_call_details") |
|
else {} |
|
), |
|
result=result, |
|
) |
|
for callback in callbacks: |
|
try: |
|
if callback == "lunary" and lunaryLogger is not None: |
|
print_verbose("reaches lunary for logging error!") |
|
|
|
model = self.model |
|
|
|
input = self.model_call_details["input"] |
|
|
|
_type = ( |
|
"embed" |
|
if self.call_type == CallTypes.embedding.value |
|
else "llm" |
|
) |
|
|
|
lunaryLogger.log_event( |
|
kwargs=self.model_call_details, |
|
type=_type, |
|
event="error", |
|
user_id=self.model_call_details.get("user", "default"), |
|
model=model, |
|
input=input, |
|
error=traceback_exception, |
|
run_id=self.litellm_call_id, |
|
start_time=start_time, |
|
end_time=end_time, |
|
print_verbose=print_verbose, |
|
) |
|
if callback == "sentry": |
|
print_verbose("sending exception to sentry") |
|
if capture_exception: |
|
capture_exception(exception) |
|
else: |
|
print_verbose( |
|
f"capture exception not initialized: {capture_exception}" |
|
) |
|
elif callback == "supabase" and supabaseClient is not None: |
|
print_verbose("reaches supabase for logging!") |
|
print_verbose(f"supabaseClient: {supabaseClient}") |
|
supabaseClient.log_event( |
|
model=self.model if hasattr(self, "model") else "", |
|
messages=self.messages, |
|
end_user=self.model_call_details.get("user", "default"), |
|
response_obj=result, |
|
start_time=start_time, |
|
end_time=end_time, |
|
litellm_call_id=self.model_call_details["litellm_call_id"], |
|
print_verbose=print_verbose, |
|
) |
|
if ( |
|
callable(callback) and customLogger is not None |
|
): |
|
customLogger.log_event( |
|
kwargs=self.model_call_details, |
|
response_obj=result, |
|
start_time=start_time, |
|
end_time=end_time, |
|
print_verbose=print_verbose, |
|
callback_func=callback, |
|
) |
|
if ( |
|
isinstance(callback, CustomLogger) |
|
and self.model_call_details.get("litellm_params", {}).get( |
|
"acompletion", False |
|
) |
|
is not True |
|
and self.model_call_details.get("litellm_params", {}).get( |
|
"aembedding", False |
|
) |
|
is not True |
|
): |
|
|
|
callback.log_failure_event( |
|
start_time=start_time, |
|
end_time=end_time, |
|
response_obj=result, |
|
kwargs=self.model_call_details, |
|
) |
|
if callback == "langfuse": |
|
global langFuseLogger |
|
verbose_logger.debug("reaches langfuse for logging failure") |
|
kwargs = {} |
|
for k, v in self.model_call_details.items(): |
|
if ( |
|
k != "original_response" |
|
): |
|
kwargs[k] = v |
|
|
|
langfuse_logger_to_use = LangFuseHandler.get_langfuse_logger_for_request( |
|
globalLangfuseLogger=langFuseLogger, |
|
standard_callback_dynamic_params=self.standard_callback_dynamic_params, |
|
in_memory_dynamic_logger_cache=in_memory_dynamic_logger_cache, |
|
) |
|
_response = langfuse_logger_to_use._old_log_event( |
|
start_time=start_time, |
|
end_time=end_time, |
|
response_obj=None, |
|
user_id=kwargs.get("user", None), |
|
print_verbose=print_verbose, |
|
status_message=str(exception), |
|
level="ERROR", |
|
kwargs=self.model_call_details, |
|
) |
|
if _response is not None and isinstance(_response, dict): |
|
_trace_id = _response.get("trace_id", None) |
|
if _trace_id is not None: |
|
in_memory_trace_id_cache.set_cache( |
|
litellm_call_id=self.litellm_call_id, |
|
service_name="langfuse", |
|
trace_id=_trace_id, |
|
) |
|
if callback == "traceloop": |
|
traceloopLogger.log_event( |
|
start_time=start_time, |
|
end_time=end_time, |
|
response_obj=None, |
|
user_id=self.model_call_details.get("user", None), |
|
print_verbose=print_verbose, |
|
status_message=str(exception), |
|
level="ERROR", |
|
kwargs=self.model_call_details, |
|
) |
|
if callback == "logfire" and logfireLogger is not None: |
|
verbose_logger.debug("reaches logfire for failure logging!") |
|
kwargs = {} |
|
for k, v in self.model_call_details.items(): |
|
if ( |
|
k != "original_response" |
|
): |
|
kwargs[k] = v |
|
kwargs["exception"] = exception |
|
|
|
logfireLogger.log_event( |
|
kwargs=kwargs, |
|
response_obj=result, |
|
start_time=start_time, |
|
end_time=end_time, |
|
level=LogfireLevel.ERROR.value, |
|
print_verbose=print_verbose, |
|
) |
|
|
|
except Exception as e: |
|
print_verbose( |
|
f"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while failure logging with integrations {str(e)}" |
|
) |
|
print_verbose( |
|
f"LiteLLM.Logging: is sentry capture exception initialized {capture_exception}" |
|
) |
|
if capture_exception: |
|
capture_exception(e) |
|
except Exception as e: |
|
verbose_logger.exception( |
|
"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while failure logging {}".format( |
|
str(e) |
|
) |
|
) |
|
|
|
async def async_failure_handler( |
|
self, exception, traceback_exception, start_time=None, end_time=None |
|
): |
|
""" |
|
Implementing async callbacks, to handle asyncio event loop issues when custom integrations need to use async functions. |
|
""" |
|
await self.special_failure_handlers(exception=exception) |
|
start_time, end_time = self._failure_handler_helper_fn( |
|
exception=exception, |
|
traceback_exception=traceback_exception, |
|
start_time=start_time, |
|
end_time=end_time, |
|
) |
|
|
|
callbacks = self.get_combined_callback_list( |
|
dynamic_success_callbacks=self.dynamic_async_failure_callbacks, |
|
global_callbacks=litellm._async_failure_callback, |
|
) |
|
|
|
result = None |
|
|
|
for callback in callbacks: |
|
try: |
|
if isinstance(callback, CustomLogger): |
|
await callback.async_log_failure_event( |
|
kwargs=self.model_call_details, |
|
response_obj=result, |
|
start_time=start_time, |
|
end_time=end_time, |
|
) |
|
if ( |
|
callable(callback) and customLogger is not None |
|
): |
|
await customLogger.async_log_event( |
|
kwargs=self.model_call_details, |
|
response_obj=result, |
|
start_time=start_time, |
|
end_time=end_time, |
|
print_verbose=print_verbose, |
|
callback_func=callback, |
|
) |
|
except Exception as e: |
|
verbose_logger.exception( |
|
"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while failure \ |
|
logging {}\nCallback={}".format( |
|
str(e), callback |
|
) |
|
) |
|
|
|
def _get_trace_id(self, service_name: Literal["langfuse"]) -> Optional[str]: |
|
""" |
|
For the given service (e.g. langfuse), return the trace_id actually logged. |
|
|
|
Used for constructing the url in slack alerting. |
|
|
|
Returns: |
|
- str: The logged trace id |
|
- None: If trace id not yet emitted. |
|
""" |
|
trace_id: Optional[str] = None |
|
if service_name == "langfuse": |
|
trace_id = in_memory_trace_id_cache.get_cache( |
|
litellm_call_id=self.litellm_call_id, service_name=service_name |
|
) |
|
|
|
return trace_id |
|
|
|
def _get_callback_object(self, service_name: Literal["langfuse"]) -> Optional[Any]: |
|
""" |
|
Return dynamic callback object. |
|
|
|
Meant to solve issue when doing key-based/team-based logging |
|
""" |
|
global langFuseLogger |
|
|
|
if service_name == "langfuse": |
|
if langFuseLogger is None or ( |
|
( |
|
self.standard_callback_dynamic_params.get("langfuse_public_key") |
|
is not None |
|
and self.standard_callback_dynamic_params.get("langfuse_public_key") |
|
!= langFuseLogger.public_key |
|
) |
|
or ( |
|
self.standard_callback_dynamic_params.get("langfuse_public_key") |
|
is not None |
|
and self.standard_callback_dynamic_params.get("langfuse_public_key") |
|
!= langFuseLogger.public_key |
|
) |
|
or ( |
|
self.standard_callback_dynamic_params.get("langfuse_host") |
|
is not None |
|
and self.standard_callback_dynamic_params.get("langfuse_host") |
|
!= langFuseLogger.langfuse_host |
|
) |
|
): |
|
return LangFuseLogger( |
|
langfuse_public_key=self.standard_callback_dynamic_params.get( |
|
"langfuse_public_key" |
|
), |
|
langfuse_secret=self.standard_callback_dynamic_params.get( |
|
"langfuse_secret" |
|
), |
|
langfuse_host=self.standard_callback_dynamic_params.get( |
|
"langfuse_host" |
|
), |
|
) |
|
return langFuseLogger |
|
|
|
return None |
|
|
|
def handle_sync_success_callbacks_for_async_calls( |
|
self, |
|
result: Any, |
|
start_time: datetime.datetime, |
|
end_time: datetime.datetime, |
|
) -> None: |
|
""" |
|
Handles calling success callbacks for Async calls. |
|
|
|
Why: Some callbacks - `langfuse`, `s3` are sync callbacks. We need to call them in the executor. |
|
""" |
|
if self._should_run_sync_callbacks_for_async_calls() is False: |
|
return |
|
|
|
executor.submit( |
|
self.success_handler, |
|
result, |
|
start_time, |
|
end_time, |
|
) |
|
|
|
def _should_run_sync_callbacks_for_async_calls(self) -> bool: |
|
""" |
|
Returns: |
|
- bool: True if sync callbacks should be run for async calls. eg. `langfuse`, `s3` |
|
""" |
|
_combined_sync_callbacks = self.get_combined_callback_list( |
|
dynamic_success_callbacks=self.dynamic_success_callbacks, |
|
global_callbacks=litellm.success_callback, |
|
) |
|
_filtered_success_callbacks = self._remove_internal_custom_logger_callbacks( |
|
_combined_sync_callbacks |
|
) |
|
_filtered_success_callbacks = self._remove_internal_litellm_callbacks( |
|
_filtered_success_callbacks |
|
) |
|
return len(_filtered_success_callbacks) > 0 |
|
|
|
def get_combined_callback_list( |
|
self, dynamic_success_callbacks: Optional[List], global_callbacks: List |
|
) -> List: |
|
if dynamic_success_callbacks is None: |
|
return global_callbacks |
|
return list(set(dynamic_success_callbacks + global_callbacks)) |
|
|
|
def _remove_internal_litellm_callbacks(self, callbacks: List) -> List: |
|
""" |
|
Creates a filtered list of callbacks, excluding internal LiteLLM callbacks. |
|
|
|
Args: |
|
callbacks: List of callback functions/strings to filter |
|
|
|
Returns: |
|
List of filtered callbacks with internal ones removed |
|
""" |
|
filtered = [ |
|
cb for cb in callbacks if not self._is_internal_litellm_proxy_callback(cb) |
|
] |
|
|
|
verbose_logger.debug(f"Filtered callbacks: {filtered}") |
|
return filtered |
|
|
|
def _get_callback_name(self, cb) -> str: |
|
""" |
|
Helper to get the name of a callback function |
|
|
|
Args: |
|
cb: The callback function/string to get the name of |
|
|
|
Returns: |
|
The name of the callback |
|
""" |
|
if hasattr(cb, "__name__"): |
|
return cb.__name__ |
|
if hasattr(cb, "__func__"): |
|
return cb.__func__.__name__ |
|
return str(cb) |
|
|
|
def _is_internal_litellm_proxy_callback(self, cb) -> bool: |
|
"""Helper to check if a callback is internal""" |
|
INTERNAL_PREFIXES = [ |
|
"_PROXY", |
|
"_service_logger.ServiceLogging", |
|
"sync_deployment_callback_on_success", |
|
] |
|
if isinstance(cb, str): |
|
return False |
|
|
|
if not callable(cb): |
|
return True |
|
|
|
cb_name = self._get_callback_name(cb) |
|
return any(prefix in cb_name for prefix in INTERNAL_PREFIXES) |
|
|
|
def _remove_internal_custom_logger_callbacks(self, callbacks: List) -> List: |
|
""" |
|
Removes internal custom logger callbacks from the list. |
|
""" |
|
_new_callbacks = [] |
|
for _c in callbacks: |
|
if isinstance(_c, CustomLogger): |
|
continue |
|
elif ( |
|
isinstance(_c, str) |
|
and _c in litellm._known_custom_logger_compatible_callbacks |
|
): |
|
continue |
|
_new_callbacks.append(_c) |
|
return _new_callbacks |
|
|
|
def _get_assembled_streaming_response( |
|
self, |
|
result: Union[ModelResponse, TextCompletionResponse, ModelResponseStream, Any], |
|
start_time: datetime.datetime, |
|
end_time: datetime.datetime, |
|
is_async: bool, |
|
streaming_chunks: List[Any], |
|
) -> Optional[Union[ModelResponse, TextCompletionResponse]]: |
|
if isinstance(result, ModelResponse): |
|
return result |
|
elif isinstance(result, TextCompletionResponse): |
|
return result |
|
elif isinstance(result, ModelResponseStream): |
|
complete_streaming_response: Optional[ |
|
Union[ModelResponse, TextCompletionResponse] |
|
] = _assemble_complete_response_from_streaming_chunks( |
|
result=result, |
|
start_time=start_time, |
|
end_time=end_time, |
|
request_kwargs=self.model_call_details, |
|
streaming_chunks=streaming_chunks, |
|
is_async=is_async, |
|
) |
|
return complete_streaming_response |
|
return None |
|
|
|
|
|
def set_callbacks(callback_list, function_id=None): |
|
""" |
|
Globally sets the callback client |
|
""" |
|
global sentry_sdk_instance, capture_exception, add_breadcrumb, posthog, slack_app, alerts_channel, traceloopLogger, athinaLogger, heliconeLogger, supabaseClient, lunaryLogger, promptLayerLogger, langFuseLogger, customLogger, weightsBiasesLogger, logfireLogger, dynamoLogger, s3Logger, dataDogLogger, prometheusLogger, greenscaleLogger, openMeterLogger |
|
|
|
try: |
|
for callback in callback_list: |
|
if callback == "sentry": |
|
try: |
|
import sentry_sdk |
|
except ImportError: |
|
print_verbose("Package 'sentry_sdk' is missing. Installing it...") |
|
subprocess.check_call( |
|
[sys.executable, "-m", "pip", "install", "sentry_sdk"] |
|
) |
|
import sentry_sdk |
|
sentry_sdk_instance = sentry_sdk |
|
sentry_trace_rate = ( |
|
os.environ.get("SENTRY_API_TRACE_RATE") |
|
if "SENTRY_API_TRACE_RATE" in os.environ |
|
else "1.0" |
|
) |
|
sentry_sdk_instance.init( |
|
dsn=os.environ.get("SENTRY_DSN"), |
|
traces_sample_rate=float(sentry_trace_rate), |
|
) |
|
capture_exception = sentry_sdk_instance.capture_exception |
|
add_breadcrumb = sentry_sdk_instance.add_breadcrumb |
|
elif callback == "posthog": |
|
try: |
|
from posthog import Posthog |
|
except ImportError: |
|
print_verbose("Package 'posthog' is missing. Installing it...") |
|
subprocess.check_call( |
|
[sys.executable, "-m", "pip", "install", "posthog"] |
|
) |
|
from posthog import Posthog |
|
posthog = Posthog( |
|
project_api_key=os.environ.get("POSTHOG_API_KEY"), |
|
host=os.environ.get("POSTHOG_API_URL"), |
|
) |
|
elif callback == "slack": |
|
try: |
|
from slack_bolt import App |
|
except ImportError: |
|
print_verbose("Package 'slack_bolt' is missing. Installing it...") |
|
subprocess.check_call( |
|
[sys.executable, "-m", "pip", "install", "slack_bolt"] |
|
) |
|
from slack_bolt import App |
|
slack_app = App( |
|
token=os.environ.get("SLACK_API_TOKEN"), |
|
signing_secret=os.environ.get("SLACK_API_SECRET"), |
|
) |
|
alerts_channel = os.environ["SLACK_API_CHANNEL"] |
|
print_verbose(f"Initialized Slack App: {slack_app}") |
|
elif callback == "traceloop": |
|
traceloopLogger = TraceloopLogger() |
|
elif callback == "athina": |
|
athinaLogger = AthinaLogger() |
|
print_verbose("Initialized Athina Logger") |
|
elif callback == "helicone": |
|
heliconeLogger = HeliconeLogger() |
|
elif callback == "lunary": |
|
lunaryLogger = LunaryLogger() |
|
elif callback == "promptlayer": |
|
promptLayerLogger = PromptLayerLogger() |
|
elif callback == "langfuse": |
|
langFuseLogger = LangFuseLogger( |
|
langfuse_public_key=None, langfuse_secret=None, langfuse_host=None |
|
) |
|
elif callback == "openmeter": |
|
openMeterLogger = OpenMeterLogger() |
|
elif callback == "datadog": |
|
dataDogLogger = DataDogLogger() |
|
elif callback == "dynamodb": |
|
dynamoLogger = DyanmoDBLogger() |
|
elif callback == "s3": |
|
s3Logger = S3Logger() |
|
elif callback == "wandb": |
|
weightsBiasesLogger = WeightsBiasesLogger() |
|
elif callback == "logfire": |
|
logfireLogger = LogfireLogger() |
|
elif callback == "supabase": |
|
print_verbose("instantiating supabase") |
|
supabaseClient = Supabase() |
|
elif callback == "greenscale": |
|
greenscaleLogger = GreenscaleLogger() |
|
print_verbose("Initialized Greenscale Logger") |
|
elif callable(callback): |
|
customLogger = CustomLogger() |
|
except Exception as e: |
|
raise e |
|
|
|
|
|
def _init_custom_logger_compatible_class( |
|
logging_integration: _custom_logger_compatible_callbacks_literal, |
|
internal_usage_cache: Optional[DualCache], |
|
llm_router: Optional[ |
|
Any |
|
], |
|
custom_logger_init_args: Optional[dict] = {}, |
|
) -> Optional[CustomLogger]: |
|
""" |
|
Initialize a custom logger compatible class |
|
""" |
|
try: |
|
custom_logger_init_args = custom_logger_init_args or {} |
|
if logging_integration == "lago": |
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, LagoLogger): |
|
return callback |
|
|
|
lago_logger = LagoLogger() |
|
_in_memory_loggers.append(lago_logger) |
|
return lago_logger |
|
elif logging_integration == "openmeter": |
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, OpenMeterLogger): |
|
return callback |
|
|
|
_openmeter_logger = OpenMeterLogger() |
|
_in_memory_loggers.append(_openmeter_logger) |
|
return _openmeter_logger |
|
elif logging_integration == "braintrust": |
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, BraintrustLogger): |
|
return callback |
|
|
|
braintrust_logger = BraintrustLogger() |
|
_in_memory_loggers.append(braintrust_logger) |
|
return braintrust_logger |
|
elif logging_integration == "langsmith": |
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, LangsmithLogger): |
|
return callback |
|
|
|
_langsmith_logger = LangsmithLogger() |
|
_in_memory_loggers.append(_langsmith_logger) |
|
return _langsmith_logger |
|
elif logging_integration == "argilla": |
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, ArgillaLogger): |
|
return callback |
|
|
|
_argilla_logger = ArgillaLogger() |
|
_in_memory_loggers.append(_argilla_logger) |
|
return _argilla_logger |
|
elif logging_integration == "literalai": |
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, LiteralAILogger): |
|
return callback |
|
|
|
_literalai_logger = LiteralAILogger() |
|
_in_memory_loggers.append(_literalai_logger) |
|
return _literalai_logger |
|
elif logging_integration == "prometheus": |
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, PrometheusLogger): |
|
return callback |
|
|
|
_prometheus_logger = PrometheusLogger() |
|
_in_memory_loggers.append(_prometheus_logger) |
|
return _prometheus_logger |
|
elif logging_integration == "datadog": |
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, DataDogLogger): |
|
return callback |
|
|
|
_datadog_logger = DataDogLogger() |
|
_in_memory_loggers.append(_datadog_logger) |
|
return _datadog_logger |
|
elif logging_integration == "datadog_llm_observability": |
|
_datadog_llm_obs_logger = DataDogLLMObsLogger() |
|
_in_memory_loggers.append(_datadog_llm_obs_logger) |
|
return _datadog_llm_obs_logger |
|
elif logging_integration == "gcs_bucket": |
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, GCSBucketLogger): |
|
return callback |
|
|
|
_gcs_bucket_logger = GCSBucketLogger() |
|
_in_memory_loggers.append(_gcs_bucket_logger) |
|
return _gcs_bucket_logger |
|
elif logging_integration == "azure_storage": |
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, AzureBlobStorageLogger): |
|
return callback |
|
|
|
_azure_storage_logger = AzureBlobStorageLogger() |
|
_in_memory_loggers.append(_azure_storage_logger) |
|
return _azure_storage_logger |
|
elif logging_integration == "opik": |
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, OpikLogger): |
|
return callback |
|
|
|
_opik_logger = OpikLogger() |
|
_in_memory_loggers.append(_opik_logger) |
|
return _opik_logger |
|
elif logging_integration == "arize": |
|
from litellm.integrations.opentelemetry import ( |
|
OpenTelemetry, |
|
OpenTelemetryConfig, |
|
) |
|
|
|
otel_config = ArizeLogger.get_arize_opentelemetry_config() |
|
if otel_config is None: |
|
raise ValueError( |
|
"No valid endpoint found for Arize, please set 'ARIZE_ENDPOINT' to your GRPC endpoint or 'ARIZE_HTTP_ENDPOINT' to your HTTP endpoint" |
|
) |
|
os.environ["OTEL_EXPORTER_OTLP_TRACES_HEADERS"] = ( |
|
f"space_key={os.getenv('ARIZE_SPACE_KEY')},api_key={os.getenv('ARIZE_API_KEY')}" |
|
) |
|
for callback in _in_memory_loggers: |
|
if ( |
|
isinstance(callback, OpenTelemetry) |
|
and callback.callback_name == "arize" |
|
): |
|
return callback |
|
_otel_logger = OpenTelemetry(config=otel_config, callback_name="arize") |
|
_in_memory_loggers.append(_otel_logger) |
|
return _otel_logger |
|
elif logging_integration == "otel": |
|
from litellm.integrations.opentelemetry import OpenTelemetry |
|
|
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, OpenTelemetry): |
|
return callback |
|
otel_logger = OpenTelemetry( |
|
**_get_custom_logger_settings_from_proxy_server( |
|
callback_name=logging_integration |
|
) |
|
) |
|
_in_memory_loggers.append(otel_logger) |
|
return otel_logger |
|
|
|
elif logging_integration == "galileo": |
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, GalileoObserve): |
|
return callback |
|
|
|
galileo_logger = GalileoObserve() |
|
_in_memory_loggers.append(galileo_logger) |
|
return galileo_logger |
|
elif logging_integration == "logfire": |
|
if "LOGFIRE_TOKEN" not in os.environ: |
|
raise ValueError("LOGFIRE_TOKEN not found in environment variables") |
|
from litellm.integrations.opentelemetry import ( |
|
OpenTelemetry, |
|
OpenTelemetryConfig, |
|
) |
|
|
|
otel_config = OpenTelemetryConfig( |
|
exporter="otlp_http", |
|
endpoint="https://logfire-api.pydantic.dev/v1/traces", |
|
headers=f"Authorization={os.getenv('LOGFIRE_TOKEN')}", |
|
) |
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, OpenTelemetry): |
|
return callback |
|
_otel_logger = OpenTelemetry(config=otel_config) |
|
_in_memory_loggers.append(_otel_logger) |
|
return _otel_logger |
|
elif logging_integration == "dynamic_rate_limiter": |
|
from litellm.proxy.hooks.dynamic_rate_limiter import ( |
|
_PROXY_DynamicRateLimitHandler, |
|
) |
|
|
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, _PROXY_DynamicRateLimitHandler): |
|
return callback |
|
|
|
if internal_usage_cache is None: |
|
raise Exception( |
|
"Internal Error: Cache cannot be empty - internal_usage_cache={}".format( |
|
internal_usage_cache |
|
) |
|
) |
|
|
|
dynamic_rate_limiter_obj = _PROXY_DynamicRateLimitHandler( |
|
internal_usage_cache=internal_usage_cache |
|
) |
|
|
|
if llm_router is not None and isinstance(llm_router, litellm.Router): |
|
dynamic_rate_limiter_obj.update_variables(llm_router=llm_router) |
|
_in_memory_loggers.append(dynamic_rate_limiter_obj) |
|
return dynamic_rate_limiter_obj |
|
elif logging_integration == "langtrace": |
|
if "LANGTRACE_API_KEY" not in os.environ: |
|
raise ValueError("LANGTRACE_API_KEY not found in environment variables") |
|
|
|
from litellm.integrations.opentelemetry import ( |
|
OpenTelemetry, |
|
OpenTelemetryConfig, |
|
) |
|
|
|
otel_config = OpenTelemetryConfig( |
|
exporter="otlp_http", |
|
endpoint="https://langtrace.ai/api/trace", |
|
) |
|
os.environ["OTEL_EXPORTER_OTLP_TRACES_HEADERS"] = ( |
|
f"api_key={os.getenv('LANGTRACE_API_KEY')}" |
|
) |
|
for callback in _in_memory_loggers: |
|
if ( |
|
isinstance(callback, OpenTelemetry) |
|
and callback.callback_name == "langtrace" |
|
): |
|
return callback |
|
_otel_logger = OpenTelemetry(config=otel_config, callback_name="langtrace") |
|
_in_memory_loggers.append(_otel_logger) |
|
return _otel_logger |
|
|
|
elif logging_integration == "mlflow": |
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, MlflowLogger): |
|
return callback |
|
|
|
_mlflow_logger = MlflowLogger() |
|
_in_memory_loggers.append(_mlflow_logger) |
|
return _mlflow_logger |
|
elif logging_integration == "langfuse": |
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, LangfusePromptManagement): |
|
return callback |
|
|
|
langfuse_logger = LangfusePromptManagement() |
|
_in_memory_loggers.append(langfuse_logger) |
|
return langfuse_logger |
|
elif logging_integration == "pagerduty": |
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, PagerDutyAlerting): |
|
return callback |
|
pagerduty_logger = PagerDutyAlerting(**custom_logger_init_args) |
|
_in_memory_loggers.append(pagerduty_logger) |
|
return pagerduty_logger |
|
elif logging_integration == "gcs_pubsub": |
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, GcsPubSubLogger): |
|
return callback |
|
_gcs_pubsub_logger = GcsPubSubLogger() |
|
_in_memory_loggers.append(_gcs_pubsub_logger) |
|
return _gcs_pubsub_logger |
|
elif logging_integration == "humanloop": |
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, HumanloopLogger): |
|
return callback |
|
|
|
humanloop_logger = HumanloopLogger() |
|
_in_memory_loggers.append(humanloop_logger) |
|
return humanloop_logger |
|
except Exception as e: |
|
verbose_logger.exception( |
|
f"[Non-Blocking Error] Error initializing custom logger: {e}" |
|
) |
|
return None |
|
|
|
|
|
def get_custom_logger_compatible_class( |
|
logging_integration: _custom_logger_compatible_callbacks_literal, |
|
) -> Optional[CustomLogger]: |
|
try: |
|
if logging_integration == "lago": |
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, LagoLogger): |
|
return callback |
|
elif logging_integration == "openmeter": |
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, OpenMeterLogger): |
|
return callback |
|
elif logging_integration == "braintrust": |
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, BraintrustLogger): |
|
return callback |
|
elif logging_integration == "galileo": |
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, GalileoObserve): |
|
return callback |
|
elif logging_integration == "langsmith": |
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, LangsmithLogger): |
|
return callback |
|
elif logging_integration == "argilla": |
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, ArgillaLogger): |
|
return callback |
|
elif logging_integration == "literalai": |
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, LiteralAILogger): |
|
return callback |
|
elif logging_integration == "prometheus": |
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, PrometheusLogger): |
|
return callback |
|
elif logging_integration == "datadog": |
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, DataDogLogger): |
|
return callback |
|
elif logging_integration == "datadog_llm_observability": |
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, DataDogLLMObsLogger): |
|
return callback |
|
elif logging_integration == "gcs_bucket": |
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, GCSBucketLogger): |
|
return callback |
|
elif logging_integration == "azure_storage": |
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, AzureBlobStorageLogger): |
|
return callback |
|
elif logging_integration == "opik": |
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, OpikLogger): |
|
return callback |
|
elif logging_integration == "langfuse": |
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, LangfusePromptManagement): |
|
return callback |
|
elif logging_integration == "otel": |
|
from litellm.integrations.opentelemetry import OpenTelemetry |
|
|
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, OpenTelemetry): |
|
return callback |
|
elif logging_integration == "arize": |
|
from litellm.integrations.opentelemetry import OpenTelemetry |
|
|
|
if "ARIZE_SPACE_KEY" not in os.environ: |
|
raise ValueError("ARIZE_SPACE_KEY not found in environment variables") |
|
if "ARIZE_API_KEY" not in os.environ: |
|
raise ValueError("ARIZE_API_KEY not found in environment variables") |
|
for callback in _in_memory_loggers: |
|
if ( |
|
isinstance(callback, OpenTelemetry) |
|
and callback.callback_name == "arize" |
|
): |
|
return callback |
|
elif logging_integration == "logfire": |
|
if "LOGFIRE_TOKEN" not in os.environ: |
|
raise ValueError("LOGFIRE_TOKEN not found in environment variables") |
|
from litellm.integrations.opentelemetry import OpenTelemetry |
|
|
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, OpenTelemetry): |
|
return callback |
|
|
|
elif logging_integration == "dynamic_rate_limiter": |
|
from litellm.proxy.hooks.dynamic_rate_limiter import ( |
|
_PROXY_DynamicRateLimitHandler, |
|
) |
|
|
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, _PROXY_DynamicRateLimitHandler): |
|
return callback |
|
|
|
elif logging_integration == "langtrace": |
|
from litellm.integrations.opentelemetry import OpenTelemetry |
|
|
|
if "LANGTRACE_API_KEY" not in os.environ: |
|
raise ValueError("LANGTRACE_API_KEY not found in environment variables") |
|
|
|
for callback in _in_memory_loggers: |
|
if ( |
|
isinstance(callback, OpenTelemetry) |
|
and callback.callback_name == "langtrace" |
|
): |
|
return callback |
|
|
|
elif logging_integration == "mlflow": |
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, MlflowLogger): |
|
return callback |
|
elif logging_integration == "pagerduty": |
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, PagerDutyAlerting): |
|
return callback |
|
elif logging_integration == "gcs_pubsub": |
|
for callback in _in_memory_loggers: |
|
if isinstance(callback, GcsPubSubLogger): |
|
return callback |
|
|
|
return None |
|
except Exception as e: |
|
verbose_logger.exception( |
|
f"[Non-Blocking Error] Error getting custom logger: {e}" |
|
) |
|
return None |
|
|
|
|
|
def _get_custom_logger_settings_from_proxy_server(callback_name: str) -> Dict: |
|
""" |
|
Get the settings for a custom logger from the proxy server config.yaml |
|
|
|
Proxy server config.yaml defines callback_settings as: |
|
|
|
callback_settings: |
|
otel: |
|
message_logging: False |
|
""" |
|
from litellm.proxy.proxy_server import callback_settings |
|
|
|
if callback_settings: |
|
return dict(callback_settings.get(callback_name, {})) |
|
return {} |
|
|
|
|
|
def use_custom_pricing_for_model(litellm_params: Optional[dict]) -> bool: |
|
""" |
|
Check if the model uses custom pricing |
|
|
|
Returns True if any of `SPECIAL_MODEL_INFO_PARAMS` are present in `litellm_params` or `model_info` |
|
""" |
|
if litellm_params is None: |
|
return False |
|
|
|
metadata: dict = litellm_params.get("metadata", {}) or {} |
|
model_info: dict = metadata.get("model_info", {}) or {} |
|
|
|
for _custom_cost_param in SPECIAL_MODEL_INFO_PARAMS: |
|
if litellm_params.get(_custom_cost_param, None) is not None: |
|
return True |
|
elif model_info.get(_custom_cost_param, None) is not None: |
|
return True |
|
|
|
return False |
|
|
|
|
|
def is_valid_sha256_hash(value: str) -> bool: |
|
|
|
return bool(re.fullmatch(r"[a-fA-F0-9]{64}", value)) |
|
|
|
|
|
class StandardLoggingPayloadSetup: |
|
@staticmethod |
|
def cleanup_timestamps( |
|
start_time: Union[dt_object, float], |
|
end_time: Union[dt_object, float], |
|
completion_start_time: Union[dt_object, float], |
|
) -> Tuple[float, float, float]: |
|
""" |
|
Convert datetime objects to floats |
|
|
|
Args: |
|
start_time: Union[dt_object, float] |
|
end_time: Union[dt_object, float] |
|
completion_start_time: Union[dt_object, float] |
|
|
|
Returns: |
|
Tuple[float, float, float]: A tuple containing the start time, end time, and completion start time as floats. |
|
""" |
|
|
|
if isinstance(start_time, datetime.datetime): |
|
start_time_float = start_time.timestamp() |
|
elif isinstance(start_time, float): |
|
start_time_float = start_time |
|
else: |
|
raise ValueError( |
|
f"start_time is required, got={start_time} of type {type(start_time)}" |
|
) |
|
|
|
if isinstance(end_time, datetime.datetime): |
|
end_time_float = end_time.timestamp() |
|
elif isinstance(end_time, float): |
|
end_time_float = end_time |
|
else: |
|
raise ValueError( |
|
f"end_time is required, got={end_time} of type {type(end_time)}" |
|
) |
|
|
|
if isinstance(completion_start_time, datetime.datetime): |
|
completion_start_time_float = completion_start_time.timestamp() |
|
elif isinstance(completion_start_time, float): |
|
completion_start_time_float = completion_start_time |
|
else: |
|
completion_start_time_float = end_time_float |
|
|
|
return start_time_float, end_time_float, completion_start_time_float |
|
|
|
@staticmethod |
|
def get_standard_logging_metadata( |
|
metadata: Optional[Dict[str, Any]], |
|
litellm_params: Optional[dict] = None, |
|
prompt_integration: Optional[str] = None, |
|
) -> StandardLoggingMetadata: |
|
""" |
|
Clean and filter the metadata dictionary to include only the specified keys in StandardLoggingMetadata. |
|
|
|
Args: |
|
metadata (Optional[Dict[str, Any]]): The original metadata dictionary. |
|
|
|
Returns: |
|
StandardLoggingMetadata: A StandardLoggingMetadata object containing the cleaned metadata. |
|
|
|
Note: |
|
- If the input metadata is None or not a dictionary, an empty StandardLoggingMetadata object is returned. |
|
- If 'user_api_key' is present in metadata and is a valid SHA256 hash, it's stored as 'user_api_key_hash'. |
|
""" |
|
prompt_management_metadata: Optional[ |
|
StandardLoggingPromptManagementMetadata |
|
] = None |
|
if litellm_params is not None: |
|
prompt_id = cast(Optional[str], litellm_params.get("prompt_id", None)) |
|
prompt_variables = cast( |
|
Optional[dict], litellm_params.get("prompt_variables", None) |
|
) |
|
|
|
if prompt_id is not None and prompt_integration is not None: |
|
prompt_management_metadata = StandardLoggingPromptManagementMetadata( |
|
prompt_id=prompt_id, |
|
prompt_variables=prompt_variables, |
|
prompt_integration=prompt_integration, |
|
) |
|
|
|
|
|
clean_metadata = StandardLoggingMetadata( |
|
user_api_key_hash=None, |
|
user_api_key_alias=None, |
|
user_api_key_team_id=None, |
|
user_api_key_org_id=None, |
|
user_api_key_user_id=None, |
|
user_api_key_team_alias=None, |
|
spend_logs_metadata=None, |
|
requester_ip_address=None, |
|
requester_metadata=None, |
|
user_api_key_end_user_id=None, |
|
prompt_management_metadata=prompt_management_metadata, |
|
) |
|
if isinstance(metadata, dict): |
|
|
|
supported_keys = StandardLoggingMetadata.__annotations__.keys() |
|
for key in supported_keys: |
|
if key in metadata: |
|
clean_metadata[key] = metadata[key] |
|
|
|
if metadata.get("user_api_key") is not None: |
|
if is_valid_sha256_hash(str(metadata.get("user_api_key"))): |
|
clean_metadata["user_api_key_hash"] = metadata.get( |
|
"user_api_key" |
|
) |
|
_potential_requester_metadata = metadata.get( |
|
"metadata", None |
|
) |
|
if ( |
|
clean_metadata["requester_metadata"] is None |
|
and _potential_requester_metadata is not None |
|
and isinstance(_potential_requester_metadata, dict) |
|
): |
|
clean_metadata["requester_metadata"] = _potential_requester_metadata |
|
return clean_metadata |
|
|
|
@staticmethod |
|
def get_usage_from_response_obj(response_obj: Optional[dict]) -> Usage: |
|
|
|
if response_obj is None: |
|
return Usage( |
|
prompt_tokens=0, |
|
completion_tokens=0, |
|
total_tokens=0, |
|
) |
|
|
|
usage = response_obj.get("usage", None) or {} |
|
if usage is None or ( |
|
not isinstance(usage, dict) and not isinstance(usage, Usage) |
|
): |
|
return Usage( |
|
prompt_tokens=0, |
|
completion_tokens=0, |
|
total_tokens=0, |
|
) |
|
elif isinstance(usage, Usage): |
|
return usage |
|
elif isinstance(usage, dict): |
|
return Usage(**usage) |
|
|
|
raise ValueError(f"usage is required, got={usage} of type {type(usage)}") |
|
|
|
@staticmethod |
|
def get_model_cost_information( |
|
base_model: Optional[str], |
|
custom_pricing: Optional[bool], |
|
custom_llm_provider: Optional[str], |
|
init_response_obj: Union[Any, BaseModel, dict], |
|
) -> StandardLoggingModelInformation: |
|
|
|
model_cost_name = _select_model_name_for_cost_calc( |
|
model=None, |
|
completion_response=init_response_obj, |
|
base_model=base_model, |
|
custom_pricing=custom_pricing, |
|
) |
|
if model_cost_name is None: |
|
model_cost_information = StandardLoggingModelInformation( |
|
model_map_key="", model_map_value=None |
|
) |
|
else: |
|
try: |
|
_model_cost_information = litellm.get_model_info( |
|
model=model_cost_name, custom_llm_provider=custom_llm_provider |
|
) |
|
model_cost_information = StandardLoggingModelInformation( |
|
model_map_key=model_cost_name, |
|
model_map_value=_model_cost_information, |
|
) |
|
except Exception: |
|
verbose_logger.debug( |
|
"Model={} is not mapped in model cost map. Defaulting to None model_cost_information for standard_logging_payload".format( |
|
model_cost_name |
|
) |
|
) |
|
model_cost_information = StandardLoggingModelInformation( |
|
model_map_key=model_cost_name, model_map_value=None |
|
) |
|
return model_cost_information |
|
|
|
@staticmethod |
|
def get_final_response_obj( |
|
response_obj: dict, init_response_obj: Union[Any, BaseModel, dict], kwargs: dict |
|
) -> Optional[Union[dict, str, list]]: |
|
""" |
|
Get final response object after redacting the message input/output from logging |
|
""" |
|
if response_obj is not None: |
|
final_response_obj: Optional[Union[dict, str, list]] = response_obj |
|
elif isinstance(init_response_obj, list) or isinstance(init_response_obj, str): |
|
final_response_obj = init_response_obj |
|
else: |
|
final_response_obj = None |
|
|
|
modified_final_response_obj = redact_message_input_output_from_logging( |
|
model_call_details=kwargs, |
|
result=final_response_obj, |
|
) |
|
|
|
if modified_final_response_obj is not None and isinstance( |
|
modified_final_response_obj, BaseModel |
|
): |
|
final_response_obj = modified_final_response_obj.model_dump() |
|
else: |
|
final_response_obj = modified_final_response_obj |
|
|
|
return final_response_obj |
|
|
|
@staticmethod |
|
def get_additional_headers( |
|
additiona_headers: Optional[dict], |
|
) -> Optional[StandardLoggingAdditionalHeaders]: |
|
|
|
if additiona_headers is None: |
|
return None |
|
|
|
additional_logging_headers: StandardLoggingAdditionalHeaders = {} |
|
|
|
for key in StandardLoggingAdditionalHeaders.__annotations__.keys(): |
|
_key = key.lower() |
|
_key = _key.replace("_", "-") |
|
if _key in additiona_headers: |
|
try: |
|
additional_logging_headers[key] = int(additiona_headers[_key]) |
|
except (ValueError, TypeError): |
|
verbose_logger.debug( |
|
f"Could not convert {additiona_headers[_key]} to int for key {key}." |
|
) |
|
return additional_logging_headers |
|
|
|
@staticmethod |
|
def get_hidden_params( |
|
hidden_params: Optional[dict], |
|
) -> StandardLoggingHiddenParams: |
|
clean_hidden_params = StandardLoggingHiddenParams( |
|
model_id=None, |
|
cache_key=None, |
|
api_base=None, |
|
response_cost=None, |
|
additional_headers=None, |
|
litellm_overhead_time_ms=None, |
|
) |
|
if hidden_params is not None: |
|
for key in StandardLoggingHiddenParams.__annotations__.keys(): |
|
if key in hidden_params: |
|
if key == "additional_headers": |
|
clean_hidden_params["additional_headers"] = ( |
|
StandardLoggingPayloadSetup.get_additional_headers( |
|
hidden_params[key] |
|
) |
|
) |
|
else: |
|
clean_hidden_params[key] = hidden_params[key] |
|
return clean_hidden_params |
|
|
|
@staticmethod |
|
def strip_trailing_slash(api_base: Optional[str]) -> Optional[str]: |
|
if api_base: |
|
return api_base.rstrip("/") |
|
return api_base |
|
|
|
@staticmethod |
|
def get_error_information( |
|
original_exception: Optional[Exception], |
|
) -> StandardLoggingPayloadErrorInformation: |
|
error_status: str = str(getattr(original_exception, "status_code", "")) |
|
error_class: str = ( |
|
str(original_exception.__class__.__name__) if original_exception else "" |
|
) |
|
_llm_provider_in_exception = getattr(original_exception, "llm_provider", "") |
|
return StandardLoggingPayloadErrorInformation( |
|
error_code=error_status, |
|
error_class=error_class, |
|
llm_provider=_llm_provider_in_exception, |
|
) |
|
|
|
@staticmethod |
|
def get_response_time( |
|
start_time_float: float, |
|
end_time_float: float, |
|
completion_start_time_float: float, |
|
stream: bool, |
|
) -> float: |
|
""" |
|
Get the response time for the LLM response |
|
|
|
Args: |
|
start_time_float: float - start time of the LLM call |
|
end_time_float: float - end time of the LLM call |
|
completion_start_time_float: float - time to first token of the LLM response (for streaming responses) |
|
stream: bool - True when a stream response is returned |
|
|
|
Returns: |
|
float: The response time for the LLM response |
|
""" |
|
if stream is True: |
|
return completion_start_time_float - start_time_float |
|
else: |
|
return end_time_float - start_time_float |
|
|
|
|
|
def get_standard_logging_object_payload( |
|
kwargs: Optional[dict], |
|
init_response_obj: Union[Any, BaseModel, dict], |
|
start_time: dt_object, |
|
end_time: dt_object, |
|
logging_obj: Logging, |
|
status: StandardLoggingPayloadStatus, |
|
error_str: Optional[str] = None, |
|
original_exception: Optional[Exception] = None, |
|
) -> Optional[StandardLoggingPayload]: |
|
try: |
|
kwargs = kwargs or {} |
|
|
|
hidden_params: Optional[dict] = None |
|
if init_response_obj is None: |
|
response_obj = {} |
|
elif isinstance(init_response_obj, BaseModel): |
|
response_obj = init_response_obj.model_dump() |
|
hidden_params = getattr(init_response_obj, "_hidden_params", None) |
|
elif isinstance(init_response_obj, dict): |
|
response_obj = init_response_obj |
|
else: |
|
response_obj = {} |
|
|
|
if original_exception is not None and hidden_params is None: |
|
response_headers = _get_response_headers(original_exception) |
|
if response_headers is not None: |
|
hidden_params = dict( |
|
StandardLoggingHiddenParams( |
|
additional_headers=StandardLoggingPayloadSetup.get_additional_headers( |
|
dict(response_headers) |
|
), |
|
model_id=None, |
|
cache_key=None, |
|
api_base=None, |
|
response_cost=None, |
|
litellm_overhead_time_ms=None, |
|
) |
|
) |
|
|
|
|
|
litellm_params = kwargs.get("litellm_params", {}) |
|
proxy_server_request = litellm_params.get("proxy_server_request") or {} |
|
|
|
metadata: dict = ( |
|
litellm_params.get("litellm_metadata") |
|
or litellm_params.get("metadata", None) |
|
or {} |
|
) |
|
completion_start_time = kwargs.get("completion_start_time", end_time) |
|
call_type = kwargs.get("call_type") |
|
cache_hit = kwargs.get("cache_hit", False) |
|
usage = StandardLoggingPayloadSetup.get_usage_from_response_obj( |
|
response_obj=response_obj |
|
) |
|
id = response_obj.get("id", kwargs.get("litellm_call_id")) |
|
|
|
_model_id = metadata.get("model_info", {}).get("id", "") |
|
_model_group = metadata.get("model_group", "") |
|
|
|
request_tags = ( |
|
metadata.get("tags", []) |
|
if isinstance(metadata.get("tags", []), list) |
|
else [] |
|
) |
|
|
|
|
|
start_time_float, end_time_float, completion_start_time_float = ( |
|
StandardLoggingPayloadSetup.cleanup_timestamps( |
|
start_time=start_time, |
|
end_time=end_time, |
|
completion_start_time=completion_start_time, |
|
) |
|
) |
|
response_time = StandardLoggingPayloadSetup.get_response_time( |
|
start_time_float=start_time_float, |
|
end_time_float=end_time_float, |
|
completion_start_time_float=completion_start_time_float, |
|
stream=kwargs.get("stream", False), |
|
) |
|
|
|
clean_hidden_params = StandardLoggingPayloadSetup.get_hidden_params( |
|
hidden_params |
|
) |
|
|
|
clean_metadata = StandardLoggingPayloadSetup.get_standard_logging_metadata( |
|
metadata=metadata, |
|
litellm_params=litellm_params, |
|
prompt_integration=kwargs.get("prompt_integration", None), |
|
) |
|
|
|
_request_body = proxy_server_request.get("body", {}) |
|
end_user_id = clean_metadata["user_api_key_end_user_id"] or _request_body.get( |
|
"user", None |
|
) |
|
|
|
saved_cache_cost: float = 0.0 |
|
if cache_hit is True: |
|
|
|
id = f"{id}_cache_hit{time.time()}" |
|
saved_cache_cost = ( |
|
logging_obj._response_cost_calculator( |
|
result=init_response_obj, cache_hit=False |
|
) |
|
or 0.0 |
|
) |
|
|
|
|
|
base_model = _get_base_model_from_metadata(model_call_details=kwargs) |
|
custom_pricing = use_custom_pricing_for_model(litellm_params=litellm_params) |
|
|
|
model_cost_information = StandardLoggingPayloadSetup.get_model_cost_information( |
|
base_model=base_model, |
|
custom_pricing=custom_pricing, |
|
custom_llm_provider=kwargs.get("custom_llm_provider"), |
|
init_response_obj=init_response_obj, |
|
) |
|
response_cost: float = kwargs.get("response_cost", 0) or 0.0 |
|
|
|
error_information = StandardLoggingPayloadSetup.get_error_information( |
|
original_exception=original_exception, |
|
) |
|
|
|
|
|
final_response_obj = StandardLoggingPayloadSetup.get_final_response_obj( |
|
response_obj=response_obj, |
|
init_response_obj=init_response_obj, |
|
kwargs=kwargs, |
|
) |
|
|
|
stream: Optional[bool] = None |
|
if ( |
|
kwargs.get("complete_streaming_response") is not None |
|
or kwargs.get("async_complete_streaming_response") is not None |
|
): |
|
stream = True |
|
|
|
payload: StandardLoggingPayload = StandardLoggingPayload( |
|
id=str(id), |
|
trace_id=kwargs.get("litellm_trace_id"), |
|
call_type=call_type or "", |
|
cache_hit=cache_hit, |
|
stream=stream, |
|
status=status, |
|
custom_llm_provider=cast(Optional[str], kwargs.get("custom_llm_provider")), |
|
saved_cache_cost=saved_cache_cost, |
|
startTime=start_time_float, |
|
endTime=end_time_float, |
|
completionStartTime=completion_start_time_float, |
|
response_time=response_time, |
|
model=kwargs.get("model", "") or "", |
|
metadata=clean_metadata, |
|
cache_key=clean_hidden_params["cache_key"], |
|
response_cost=response_cost, |
|
total_tokens=usage.total_tokens, |
|
prompt_tokens=usage.prompt_tokens, |
|
completion_tokens=usage.completion_tokens, |
|
request_tags=request_tags, |
|
end_user=end_user_id or "", |
|
api_base=StandardLoggingPayloadSetup.strip_trailing_slash( |
|
litellm_params.get("api_base", "") |
|
) |
|
or "", |
|
model_group=_model_group, |
|
model_id=_model_id, |
|
requester_ip_address=clean_metadata.get("requester_ip_address", None), |
|
messages=kwargs.get("messages"), |
|
response=final_response_obj, |
|
model_parameters=kwargs.get("optional_params", None), |
|
hidden_params=clean_hidden_params, |
|
model_map_information=model_cost_information, |
|
error_str=error_str, |
|
error_information=error_information, |
|
response_cost_failure_debug_info=kwargs.get( |
|
"response_cost_failure_debug_information" |
|
), |
|
guardrail_information=metadata.get( |
|
"standard_logging_guardrail_information", None |
|
), |
|
) |
|
|
|
emit_standard_logging_payload(payload) |
|
return payload |
|
except Exception as e: |
|
verbose_logger.exception( |
|
"Error creating standard logging object - {}".format(str(e)) |
|
) |
|
return None |
|
|
|
|
|
def emit_standard_logging_payload(payload: StandardLoggingPayload): |
|
if os.getenv("LITELLM_PRINT_STANDARD_LOGGING_PAYLOAD"): |
|
verbose_logger.info(json.dumps(payload, indent=4)) |
|
|
|
|
|
def get_standard_logging_metadata( |
|
metadata: Optional[Dict[str, Any]] |
|
) -> StandardLoggingMetadata: |
|
""" |
|
Clean and filter the metadata dictionary to include only the specified keys in StandardLoggingMetadata. |
|
|
|
Args: |
|
metadata (Optional[Dict[str, Any]]): The original metadata dictionary. |
|
|
|
Returns: |
|
StandardLoggingMetadata: A StandardLoggingMetadata object containing the cleaned metadata. |
|
|
|
Note: |
|
- If the input metadata is None or not a dictionary, an empty StandardLoggingMetadata object is returned. |
|
- If 'user_api_key' is present in metadata and is a valid SHA256 hash, it's stored as 'user_api_key_hash'. |
|
""" |
|
|
|
clean_metadata = StandardLoggingMetadata( |
|
user_api_key_hash=None, |
|
user_api_key_alias=None, |
|
user_api_key_team_id=None, |
|
user_api_key_org_id=None, |
|
user_api_key_user_id=None, |
|
user_api_key_team_alias=None, |
|
spend_logs_metadata=None, |
|
requester_ip_address=None, |
|
requester_metadata=None, |
|
user_api_key_end_user_id=None, |
|
prompt_management_metadata=None, |
|
) |
|
if isinstance(metadata, dict): |
|
|
|
clean_metadata = StandardLoggingMetadata( |
|
**{ |
|
key: metadata[key] |
|
for key in StandardLoggingMetadata.__annotations__.keys() |
|
if key in metadata |
|
} |
|
) |
|
|
|
if metadata.get("user_api_key") is not None: |
|
if is_valid_sha256_hash(str(metadata.get("user_api_key"))): |
|
clean_metadata["user_api_key_hash"] = metadata.get( |
|
"user_api_key" |
|
) |
|
return clean_metadata |
|
|
|
|
|
def scrub_sensitive_keys_in_metadata(litellm_params: Optional[dict]): |
|
if litellm_params is None: |
|
litellm_params = {} |
|
|
|
metadata = litellm_params.get("metadata", {}) or {} |
|
|
|
|
|
cleaned_user_api_key_metadata = {} |
|
if "user_api_key_metadata" in metadata and isinstance( |
|
metadata["user_api_key_metadata"], dict |
|
): |
|
for k, v in metadata["user_api_key_metadata"].items(): |
|
if k == "logging": |
|
cleaned_user_api_key_metadata[k] = ( |
|
"scrubbed_by_litellm_for_sensitive_keys" |
|
) |
|
else: |
|
cleaned_user_api_key_metadata[k] = v |
|
|
|
metadata["user_api_key_metadata"] = cleaned_user_api_key_metadata |
|
litellm_params["metadata"] = metadata |
|
|
|
return litellm_params |
|
|
|
|
|
|
|
def modify_integration(integration_name, integration_params): |
|
global supabaseClient |
|
if integration_name == "supabase": |
|
if "table_name" in integration_params: |
|
Supabase.supabase_table_name = integration_params["table_name"] |
|
|
|
|
|
@lru_cache(maxsize=16) |
|
def _get_traceback_str_for_error(error_str: str) -> str: |
|
""" |
|
function wrapped with lru_cache to limit the number of times `traceback.format_exc()` is called |
|
""" |
|
return traceback.format_exc() |
|
|
|
|
|
from decimal import Decimal |
|
|
|
|
|
from typing import Any, Dict, List, Optional, Union |
|
|
|
|
|
def create_dummy_standard_logging_payload() -> StandardLoggingPayload: |
|
|
|
model_info = StandardLoggingModelInformation( |
|
model_map_key="gpt-3.5-turbo", model_map_value=None |
|
) |
|
|
|
metadata = StandardLoggingMetadata( |
|
user_api_key_hash=str("test_hash"), |
|
user_api_key_alias=str("test_alias"), |
|
user_api_key_team_id=str("test_team"), |
|
user_api_key_user_id=str("test_user"), |
|
user_api_key_team_alias=str("test_team_alias"), |
|
user_api_key_org_id=None, |
|
spend_logs_metadata=None, |
|
requester_ip_address=str("127.0.0.1"), |
|
requester_metadata=None, |
|
user_api_key_end_user_id=str("test_end_user"), |
|
) |
|
|
|
hidden_params = StandardLoggingHiddenParams( |
|
model_id=None, |
|
cache_key=None, |
|
api_base=None, |
|
response_cost=None, |
|
additional_headers=None, |
|
litellm_overhead_time_ms=None, |
|
) |
|
|
|
|
|
response_cost = Decimal("0.1") |
|
start_time = Decimal("1234567890.0") |
|
end_time = Decimal("1234567891.0") |
|
completion_start_time = Decimal("1234567890.5") |
|
saved_cache_cost = Decimal("0.0") |
|
|
|
|
|
messages: List[Dict[str, str]] = [{"role": "user", "content": "Hello, world!"}] |
|
response: Dict[str, List[Dict[str, Dict[str, str]]]] = { |
|
"choices": [{"message": {"content": "Hi there!"}}] |
|
} |
|
|
|
|
|
return StandardLoggingPayload( |
|
id=str("test_id"), |
|
call_type=str("completion"), |
|
stream=bool(False), |
|
response_cost=response_cost, |
|
response_cost_failure_debug_info=None, |
|
status=str("success"), |
|
total_tokens=int(30), |
|
prompt_tokens=int(20), |
|
completion_tokens=int(10), |
|
startTime=start_time, |
|
endTime=end_time, |
|
completionStartTime=completion_start_time, |
|
model_map_information=model_info, |
|
model=str("gpt-3.5-turbo"), |
|
model_id=str("model-123"), |
|
model_group=str("openai-gpt"), |
|
custom_llm_provider=str("openai"), |
|
api_base=str("https://api.openai.com"), |
|
metadata=metadata, |
|
cache_hit=bool(False), |
|
cache_key=None, |
|
saved_cache_cost=saved_cache_cost, |
|
request_tags=[], |
|
end_user=None, |
|
requester_ip_address=str("127.0.0.1"), |
|
messages=messages, |
|
response=response, |
|
error_str=None, |
|
model_parameters={"stream": True}, |
|
hidden_params=hidden_params, |
|
) |
|
|