# What is this? ## Common Utility file for Logging handler # Logging function -> log the exact model details + what's being sent | Non-Blocking import copy import datetime import json import os import re import subprocess import sys import time import traceback import uuid from datetime import datetime as dt_object from functools import lru_cache from typing import Any, Callable, Dict, List, Literal, Optional, Tuple, Union, cast from pydantic import BaseModel import litellm from litellm import ( _custom_logger_compatible_callbacks_literal, json_logs, log_raw_request_response, turn_off_message_logging, ) from litellm._logging import _is_debugging_on, verbose_logger from litellm.caching.caching import DualCache, InMemoryCache from litellm.caching.caching_handler import LLMCachingHandler from litellm.cost_calculator import _select_model_name_for_cost_calc from litellm.integrations.custom_guardrail import CustomGuardrail from litellm.integrations.custom_logger import CustomLogger from litellm.integrations.mlflow import MlflowLogger from litellm.integrations.pagerduty.pagerduty import PagerDutyAlerting from litellm.litellm_core_utils.get_litellm_params import get_litellm_params from litellm.litellm_core_utils.redact_messages import ( redact_message_input_output_from_custom_logger, redact_message_input_output_from_logging, ) from litellm.types.llms.openai import ( AllMessageValues, Batch, FineTuningJob, HttpxBinaryResponseContent, ) from litellm.types.rerank import RerankResponse from litellm.types.router import SPECIAL_MODEL_INFO_PARAMS from litellm.types.utils import ( CallTypes, EmbeddingResponse, ImageResponse, LiteLLMLoggingBaseClass, ModelResponse, ModelResponseStream, StandardCallbackDynamicParams, StandardLoggingAdditionalHeaders, StandardLoggingHiddenParams, StandardLoggingMetadata, StandardLoggingModelCostFailureDebugInformation, StandardLoggingModelInformation, StandardLoggingPayload, StandardLoggingPayloadErrorInformation, StandardLoggingPayloadStatus, StandardLoggingPromptManagementMetadata, TextCompletionResponse, TranscriptionResponse, Usage, ) from litellm.utils import _get_base_model_from_metadata, executor, print_verbose from ..integrations.argilla import ArgillaLogger from ..integrations.arize_ai import ArizeLogger from ..integrations.athina import AthinaLogger from ..integrations.azure_storage.azure_storage import AzureBlobStorageLogger from ..integrations.braintrust_logging import BraintrustLogger from ..integrations.datadog.datadog import DataDogLogger from ..integrations.datadog.datadog_llm_obs import DataDogLLMObsLogger from ..integrations.dynamodb import DyanmoDBLogger from ..integrations.galileo import GalileoObserve from ..integrations.gcs_bucket.gcs_bucket import GCSBucketLogger from ..integrations.gcs_pubsub.pub_sub import GcsPubSubLogger from ..integrations.greenscale import GreenscaleLogger from ..integrations.helicone import HeliconeLogger from ..integrations.humanloop import HumanloopLogger from ..integrations.lago import LagoLogger from ..integrations.langfuse.langfuse import LangFuseLogger from ..integrations.langfuse.langfuse_handler import LangFuseHandler from ..integrations.langfuse.langfuse_prompt_management import LangfusePromptManagement from ..integrations.langsmith import LangsmithLogger from ..integrations.literal_ai import LiteralAILogger from ..integrations.logfire_logger import LogfireLevel, LogfireLogger from ..integrations.lunary import LunaryLogger from ..integrations.openmeter import OpenMeterLogger from ..integrations.opik.opik import OpikLogger from ..integrations.prometheus import PrometheusLogger from ..integrations.prompt_layer import PromptLayerLogger from ..integrations.s3 import S3Logger from ..integrations.supabase import Supabase from ..integrations.traceloop import TraceloopLogger from ..integrations.weights_biases import WeightsBiasesLogger from .exception_mapping_utils import _get_response_headers from .initialize_dynamic_callback_params import ( initialize_standard_callback_dynamic_params as _initialize_standard_callback_dynamic_params, ) from .logging_utils import _assemble_complete_response_from_streaming_chunks from .specialty_caches.dynamic_logging_cache import DynamicLoggingCache try: from ..proxy.enterprise.enterprise_callbacks.generic_api_callback import ( GenericAPILogger, ) except Exception as e: verbose_logger.debug( f"[Non-Blocking] Unable to import GenericAPILogger - LiteLLM Enterprise Feature - {str(e)}" ) _in_memory_loggers: List[Any] = [] ### GLOBAL VARIABLES ### sentry_sdk_instance = None capture_exception = None add_breadcrumb = None posthog = None slack_app = None alerts_channel = None heliconeLogger = None athinaLogger = None promptLayerLogger = None logfireLogger = None weightsBiasesLogger = None customLogger = None langFuseLogger = None openMeterLogger = None lagoLogger = None dataDogLogger = None prometheusLogger = None dynamoLogger = None s3Logger = None genericAPILogger = None greenscaleLogger = None lunaryLogger = None supabaseClient = None callback_list: Optional[List[str]] = [] user_logger_fn = None additional_details: Optional[Dict[str, str]] = {} local_cache: Optional[Dict[str, str]] = {} last_fetched_at = None last_fetched_at_keys = None #### class ServiceTraceIDCache: def __init__(self) -> None: self.cache = InMemoryCache() def get_cache(self, litellm_call_id: str, service_name: str) -> Optional[str]: key_name = "{}:{}".format(service_name, litellm_call_id) response = self.cache.get_cache(key=key_name) return response def set_cache(self, litellm_call_id: str, service_name: str, trace_id: str) -> None: key_name = "{}:{}".format(service_name, litellm_call_id) self.cache.set_cache(key=key_name, value=trace_id) return None in_memory_trace_id_cache = ServiceTraceIDCache() in_memory_dynamic_logger_cache = DynamicLoggingCache() class Logging(LiteLLMLoggingBaseClass): global supabaseClient, promptLayerLogger, weightsBiasesLogger, logfireLogger, capture_exception, add_breadcrumb, lunaryLogger, logfireLogger, prometheusLogger, slack_app custom_pricing: bool = False stream_options = None def __init__( self, model: str, messages, stream, call_type, start_time, litellm_call_id: str, function_id: str, litellm_trace_id: Optional[str] = None, dynamic_input_callbacks: Optional[ List[Union[str, Callable, CustomLogger]] ] = None, dynamic_success_callbacks: Optional[ List[Union[str, Callable, CustomLogger]] ] = None, dynamic_async_success_callbacks: Optional[ List[Union[str, Callable, CustomLogger]] ] = None, dynamic_failure_callbacks: Optional[ List[Union[str, Callable, CustomLogger]] ] = None, dynamic_async_failure_callbacks: Optional[ List[Union[str, Callable, CustomLogger]] ] = None, kwargs: Optional[Dict] = None, ): _input: Optional[str] = messages # save original value of messages if messages is not None: if isinstance(messages, str): messages = [ {"role": "user", "content": messages} ] # convert text completion input to the chat completion format elif ( isinstance(messages, list) and len(messages) > 0 and isinstance(messages[0], str) ): new_messages = [] for m in messages: new_messages.append({"role": "user", "content": m}) messages = new_messages self.model = model self.messages = copy.deepcopy(messages) self.stream = stream self.start_time = start_time # log the call start time self.call_type = call_type self.litellm_call_id = litellm_call_id self.litellm_trace_id = litellm_trace_id self.function_id = function_id self.streaming_chunks: List[Any] = [] # for generating complete stream response self.sync_streaming_chunks: List[Any] = ( [] ) # for generating complete stream response # Initialize dynamic callbacks self.dynamic_input_callbacks: Optional[ List[Union[str, Callable, CustomLogger]] ] = dynamic_input_callbacks self.dynamic_success_callbacks: Optional[ List[Union[str, Callable, CustomLogger]] ] = dynamic_success_callbacks self.dynamic_async_success_callbacks: Optional[ List[Union[str, Callable, CustomLogger]] ] = dynamic_async_success_callbacks self.dynamic_failure_callbacks: Optional[ List[Union[str, Callable, CustomLogger]] ] = dynamic_failure_callbacks self.dynamic_async_failure_callbacks: Optional[ List[Union[str, Callable, CustomLogger]] ] = dynamic_async_failure_callbacks # Process dynamic callbacks self.process_dynamic_callbacks() ## DYNAMIC LANGFUSE / GCS / logging callback KEYS ## self.standard_callback_dynamic_params: StandardCallbackDynamicParams = ( self.initialize_standard_callback_dynamic_params(kwargs) ) ## TIME TO FIRST TOKEN LOGGING ## self.completion_start_time: Optional[datetime.datetime] = None self._llm_caching_handler: Optional[LLMCachingHandler] = None # INITIAL LITELLM_PARAMS litellm_params = {} if kwargs is not None: litellm_params = get_litellm_params(**kwargs) litellm_params = scrub_sensitive_keys_in_metadata(litellm_params) self.litellm_params = litellm_params self.model_call_details: Dict[str, Any] = { "litellm_trace_id": litellm_trace_id, "litellm_call_id": litellm_call_id, "input": _input, "litellm_params": litellm_params, } def process_dynamic_callbacks(self): """ Initializes CustomLogger compatible callbacks in self.dynamic_* callbacks If a callback is in litellm._known_custom_logger_compatible_callbacks, it needs to be intialized and added to the respective dynamic_* callback list. """ # Process input callbacks self.dynamic_input_callbacks = self._process_dynamic_callback_list( self.dynamic_input_callbacks, dynamic_callbacks_type="input" ) # Process failure callbacks self.dynamic_failure_callbacks = self._process_dynamic_callback_list( self.dynamic_failure_callbacks, dynamic_callbacks_type="failure" ) # Process async failure callbacks self.dynamic_async_failure_callbacks = self._process_dynamic_callback_list( self.dynamic_async_failure_callbacks, dynamic_callbacks_type="async_failure" ) # Process success callbacks self.dynamic_success_callbacks = self._process_dynamic_callback_list( self.dynamic_success_callbacks, dynamic_callbacks_type="success" ) # Process async success callbacks self.dynamic_async_success_callbacks = self._process_dynamic_callback_list( self.dynamic_async_success_callbacks, dynamic_callbacks_type="async_success" ) def _process_dynamic_callback_list( self, callback_list: Optional[List[Union[str, Callable, CustomLogger]]], dynamic_callbacks_type: Literal[ "input", "success", "failure", "async_success", "async_failure" ], ) -> Optional[List[Union[str, Callable, CustomLogger]]]: """ Helper function to initialize CustomLogger compatible callbacks in self.dynamic_* callbacks - If a callback is in litellm._known_custom_logger_compatible_callbacks, replace the string with the initialized callback class. - If dynamic callback is a "success" callback that is a known_custom_logger_compatible_callbacks then add it to dynamic_async_success_callbacks - If dynamic callback is a "failure" callback that is a known_custom_logger_compatible_callbacks then add it to dynamic_failure_callbacks """ if callback_list is None: return None processed_list: List[Union[str, Callable, CustomLogger]] = [] for callback in callback_list: if ( isinstance(callback, str) and callback in litellm._known_custom_logger_compatible_callbacks ): callback_class = _init_custom_logger_compatible_class( callback, internal_usage_cache=None, llm_router=None # type: ignore ) if callback_class is not None: processed_list.append(callback_class) # If processing dynamic_success_callbacks, add to dynamic_async_success_callbacks if dynamic_callbacks_type == "success": if self.dynamic_async_success_callbacks is None: self.dynamic_async_success_callbacks = [] self.dynamic_async_success_callbacks.append(callback_class) elif dynamic_callbacks_type == "failure": if self.dynamic_async_failure_callbacks is None: self.dynamic_async_failure_callbacks = [] self.dynamic_async_failure_callbacks.append(callback_class) else: processed_list.append(callback) return processed_list def initialize_standard_callback_dynamic_params( self, kwargs: Optional[Dict] = None ) -> StandardCallbackDynamicParams: """ Initialize the standard callback dynamic params from the kwargs checks if langfuse_secret_key, gcs_bucket_name in kwargs and sets the corresponding attributes in StandardCallbackDynamicParams """ return _initialize_standard_callback_dynamic_params(kwargs) def update_environment_variables( self, litellm_params: Dict, optional_params: Dict, model: Optional[str] = None, user: Optional[str] = None, **additional_params, ): self.optional_params = optional_params if model is not None: self.model = model self.user = user self.litellm_params = { **self.litellm_params, **scrub_sensitive_keys_in_metadata(litellm_params), } self.logger_fn = litellm_params.get("logger_fn", None) verbose_logger.debug(f"self.optional_params: {self.optional_params}") self.model_call_details.update( { "model": self.model, "messages": self.messages, "optional_params": self.optional_params, "litellm_params": self.litellm_params, "start_time": self.start_time, "stream": self.stream, "user": user, "call_type": str(self.call_type), "litellm_call_id": self.litellm_call_id, "completion_start_time": self.completion_start_time, "standard_callback_dynamic_params": self.standard_callback_dynamic_params, **self.optional_params, **additional_params, } ) ## check if stream options is set ## - used by CustomStreamWrapper for easy instrumentation if "stream_options" in additional_params: self.stream_options = additional_params["stream_options"] ## check if custom pricing set ## if ( litellm_params.get("input_cost_per_token") is not None or litellm_params.get("input_cost_per_second") is not None or litellm_params.get("output_cost_per_token") is not None or litellm_params.get("output_cost_per_second") is not None ): self.custom_pricing = True if "custom_llm_provider" in self.model_call_details: self.custom_llm_provider = self.model_call_details["custom_llm_provider"] def get_chat_completion_prompt( self, model: str, messages: List[AllMessageValues], non_default_params: dict, prompt_id: str, prompt_variables: Optional[dict], ) -> Tuple[str, List[AllMessageValues], dict]: for ( custom_logger_compatible_callback ) in litellm._known_custom_logger_compatible_callbacks: if model.startswith(custom_logger_compatible_callback): custom_logger = _init_custom_logger_compatible_class( logging_integration=custom_logger_compatible_callback, internal_usage_cache=None, llm_router=None, ) if custom_logger is None: continue old_name = model model, messages, non_default_params = ( custom_logger.get_chat_completion_prompt( model=model, messages=messages, non_default_params=non_default_params, prompt_id=prompt_id, prompt_variables=prompt_variables, dynamic_callback_params=self.standard_callback_dynamic_params, ) ) self.model_call_details["prompt_integration"] = old_name.split("/")[0] self.messages = messages return model, messages, non_default_params def _pre_call(self, input, api_key, model=None, additional_args={}): """ Common helper function across the sync + async pre-call function """ self.model_call_details["input"] = input self.model_call_details["api_key"] = api_key self.model_call_details["additional_args"] = additional_args self.model_call_details["log_event_type"] = "pre_api_call" if ( model ): # if model name was changes pre-call, overwrite the initial model call name with the new one self.model_call_details["model"] = model def pre_call(self, input, api_key, model=None, additional_args={}): # noqa: PLR0915 # Log the exact input to the LLM API litellm.error_logs["PRE_CALL"] = locals() try: self._pre_call( input=input, api_key=api_key, model=model, additional_args=additional_args, ) # User Logging -> if you pass in a custom logging function self._print_llm_call_debugging_log( api_base=additional_args.get("api_base", ""), headers=additional_args.get("headers", {}), additional_args=additional_args, ) # log raw request to provider (like LangFuse) -- if opted in. if log_raw_request_response is True: _litellm_params = self.model_call_details.get("litellm_params", {}) _metadata = _litellm_params.get("metadata", {}) or {} try: # [Non-blocking Extra Debug Information in metadata] if ( turn_off_message_logging is not None and turn_off_message_logging is True ): _metadata["raw_request"] = ( "redacted by litellm. \ 'litellm.turn_off_message_logging=True'" ) else: curl_command = self._get_request_curl_command( api_base=additional_args.get("api_base", ""), headers=additional_args.get("headers", {}), additional_args=additional_args, data=additional_args.get("complete_input_dict", {}), ) _metadata["raw_request"] = str(curl_command) except Exception as e: _metadata["raw_request"] = ( "Unable to Log \ raw request: {}".format( str(e) ) ) if self.logger_fn and callable(self.logger_fn): try: self.logger_fn( self.model_call_details ) # Expectation: any logger function passed in by the user should accept a dict object except Exception as e: verbose_logger.exception( "LiteLLM.LoggingError: [Non-Blocking] Exception occurred while logging {}".format( str(e) ) ) self.model_call_details["api_call_start_time"] = datetime.datetime.now() # Input Integration Logging -> If you want to log the fact that an attempt to call the model was made callbacks = litellm.input_callback + (self.dynamic_input_callbacks or []) for callback in callbacks: try: if callback == "supabase" and supabaseClient is not None: verbose_logger.debug("reaches supabase for logging!") model = self.model_call_details["model"] messages = self.model_call_details["input"] verbose_logger.debug(f"supabaseClient: {supabaseClient}") supabaseClient.input_log_event( model=model, messages=messages, end_user=self.model_call_details.get("user", "default"), litellm_call_id=self.litellm_params["litellm_call_id"], print_verbose=print_verbose, ) elif callback == "sentry" and add_breadcrumb: try: details_to_log = copy.deepcopy(self.model_call_details) except Exception: details_to_log = self.model_call_details if litellm.turn_off_message_logging: # make a copy of the _model_Call_details and log it details_to_log.pop("messages", None) details_to_log.pop("input", None) details_to_log.pop("prompt", None) add_breadcrumb( category="litellm.llm_call", message=f"Model Call Details pre-call: {details_to_log}", level="info", ) elif isinstance(callback, CustomLogger): # custom logger class callback.log_pre_api_call( model=self.model, messages=self.messages, kwargs=self.model_call_details, ) elif ( callable(callback) and customLogger is not None ): # custom logger functions customLogger.log_input_event( model=self.model, messages=self.messages, kwargs=self.model_call_details, print_verbose=print_verbose, callback_func=callback, ) except Exception as e: verbose_logger.exception( "litellm.Logging.pre_call(): Exception occured - {}".format( str(e) ) ) verbose_logger.debug( f"LiteLLM.Logging: is sentry capture exception initialized {capture_exception}" ) if capture_exception: # log this error to sentry for debugging capture_exception(e) except Exception as e: verbose_logger.exception( "LiteLLM.LoggingError: [Non-Blocking] Exception occurred while logging {}".format( str(e) ) ) verbose_logger.error( f"LiteLLM.Logging: is sentry capture exception initialized {capture_exception}" ) if capture_exception: # log this error to sentry for debugging capture_exception(e) def _print_llm_call_debugging_log( self, api_base: str, headers: dict, additional_args: dict, ): """ Internal debugging helper function Prints the RAW curl command sent from LiteLLM """ if _is_debugging_on(): if json_logs: masked_headers = self._get_masked_headers(headers) verbose_logger.debug( "POST Request Sent from LiteLLM", extra={"api_base": {api_base}, **masked_headers}, ) else: headers = additional_args.get("headers", {}) if headers is None: headers = {} data = additional_args.get("complete_input_dict", {}) api_base = str(additional_args.get("api_base", "")) if "key=" in api_base: # Find the position of "key=" in the string key_index = api_base.find("key=") + 4 # Mask the last 5 characters after "key=" masked_api_base = api_base[:key_index] + "*" * 5 + api_base[-4:] else: masked_api_base = api_base self.model_call_details["litellm_params"]["api_base"] = masked_api_base curl_command = self._get_request_curl_command( api_base=api_base, headers=headers, additional_args=additional_args, data=data, ) verbose_logger.debug(f"\033[92m{curl_command}\033[0m\n") def _get_request_curl_command( self, api_base: str, headers: dict, additional_args: dict, data: dict ) -> str: curl_command = "\n\nPOST Request Sent from LiteLLM:\n" curl_command += "curl -X POST \\\n" curl_command += f"{api_base} \\\n" masked_headers = self._get_masked_headers(headers) formatted_headers = " ".join( [f"-H '{k}: {v}'" for k, v in masked_headers.items()] ) curl_command += ( f"{formatted_headers} \\\n" if formatted_headers.strip() != "" else "" ) curl_command += f"-d '{str(data)}'\n" if additional_args.get("request_str", None) is not None: # print the sagemaker / bedrock client request curl_command = "\nRequest Sent from LiteLLM:\n" curl_command += additional_args.get("request_str", None) elif api_base == "": curl_command = str(self.model_call_details) return curl_command def _get_masked_headers(self, headers: dict): """ Internal debugging helper function Masks the headers of the request sent from LiteLLM """ return { k: ( (v[:-44] + "*" * 44) if (isinstance(v, str) and len(v) > 44) else "*****" ) for k, v in headers.items() } def post_call( self, original_response, input=None, api_key=None, additional_args={} ): # Log the exact result from the LLM API, for streaming - log the type of response received litellm.error_logs["POST_CALL"] = locals() if isinstance(original_response, dict): original_response = json.dumps(original_response) try: self.model_call_details["input"] = input self.model_call_details["api_key"] = api_key self.model_call_details["original_response"] = original_response self.model_call_details["additional_args"] = additional_args self.model_call_details["log_event_type"] = "post_api_call" if json_logs: verbose_logger.debug( "RAW RESPONSE:\n{}\n\n".format( self.model_call_details.get( "original_response", self.model_call_details ) ), ) else: print_verbose( "RAW RESPONSE:\n{}\n\n".format( self.model_call_details.get( "original_response", self.model_call_details ) ) ) if self.logger_fn and callable(self.logger_fn): try: self.logger_fn( self.model_call_details ) # Expectation: any logger function passed in by the user should accept a dict object except Exception as e: verbose_logger.exception( "LiteLLM.LoggingError: [Non-Blocking] Exception occurred while logging {}".format( str(e) ) ) original_response = redact_message_input_output_from_logging( model_call_details=( self.model_call_details if hasattr(self, "model_call_details") else {} ), result=original_response, ) # Input Integration Logging -> If you want to log the fact that an attempt to call the model was made callbacks = litellm.input_callback + (self.dynamic_input_callbacks or []) for callback in callbacks: try: if callback == "sentry" and add_breadcrumb: verbose_logger.debug("reaches sentry breadcrumbing") try: details_to_log = copy.deepcopy(self.model_call_details) except Exception: details_to_log = self.model_call_details if litellm.turn_off_message_logging: # make a copy of the _model_Call_details and log it details_to_log.pop("messages", None) details_to_log.pop("input", None) details_to_log.pop("prompt", None) add_breadcrumb( category="litellm.llm_call", message=f"Model Call Details post-call: {details_to_log}", level="info", ) elif isinstance(callback, CustomLogger): # custom logger class callback.log_post_api_call( kwargs=self.model_call_details, response_obj=None, start_time=self.start_time, end_time=None, ) except Exception as e: verbose_logger.exception( "LiteLLM.LoggingError: [Non-Blocking] Exception occurred while post-call logging with integrations {}".format( str(e) ) ) verbose_logger.debug( f"LiteLLM.Logging: is sentry capture exception initialized {capture_exception}" ) if capture_exception: # log this error to sentry for debugging capture_exception(e) except Exception as e: verbose_logger.exception( "LiteLLM.LoggingError: [Non-Blocking] Exception occurred while logging {}".format( str(e) ) ) def get_response_ms(self) -> float: return ( self.model_call_details.get("end_time", datetime.datetime.now()) - self.model_call_details.get("start_time", datetime.datetime.now()) ).total_seconds() * 1000 def _response_cost_calculator( self, result: Union[ ModelResponse, ModelResponseStream, EmbeddingResponse, ImageResponse, TranscriptionResponse, TextCompletionResponse, HttpxBinaryResponseContent, RerankResponse, Batch, FineTuningJob, ], cache_hit: Optional[bool] = None, ) -> Optional[float]: """ Calculate response cost using result + logging object variables. used for consistent cost calculation across response headers + logging integrations. """ ## RESPONSE COST ## custom_pricing = use_custom_pricing_for_model( litellm_params=( self.litellm_params if hasattr(self, "litellm_params") else None ) ) prompt = "" # use for tts cost calc _input = self.model_call_details.get("input", None) if _input is not None and isinstance(_input, str): prompt = _input if cache_hit is None: cache_hit = self.model_call_details.get("cache_hit", False) try: response_cost_calculator_kwargs = { "response_object": result, "model": self.model, "cache_hit": cache_hit, "custom_llm_provider": self.model_call_details.get( "custom_llm_provider", None ), "base_model": _get_base_model_from_metadata( model_call_details=self.model_call_details ), "call_type": self.call_type, "optional_params": self.optional_params, "custom_pricing": custom_pricing, "prompt": prompt, } except Exception as e: # error creating kwargs for cost calculation debug_info = StandardLoggingModelCostFailureDebugInformation( error_str=str(e), traceback_str=traceback.format_exc(), ) verbose_logger.debug( f"response_cost_failure_debug_information: {debug_info}" ) self.model_call_details["response_cost_failure_debug_information"] = ( debug_info ) return None try: response_cost = litellm.response_cost_calculator( **response_cost_calculator_kwargs ) verbose_logger.debug(f"response_cost: {response_cost}") return response_cost except Exception as e: # error calculating cost debug_info = StandardLoggingModelCostFailureDebugInformation( error_str=str(e), traceback_str=_get_traceback_str_for_error(str(e)), model=response_cost_calculator_kwargs["model"], cache_hit=response_cost_calculator_kwargs["cache_hit"], custom_llm_provider=response_cost_calculator_kwargs[ "custom_llm_provider" ], base_model=response_cost_calculator_kwargs["base_model"], call_type=response_cost_calculator_kwargs["call_type"], custom_pricing=response_cost_calculator_kwargs["custom_pricing"], ) verbose_logger.debug( f"response_cost_failure_debug_information: {debug_info}" ) self.model_call_details["response_cost_failure_debug_information"] = ( debug_info ) return None def should_run_callback( self, callback: litellm.CALLBACK_TYPES, litellm_params: dict, event_hook: str ) -> bool: if litellm.global_disable_no_log_param: return True if litellm_params.get("no-log", False) is True: # proxy cost tracking cal backs should run if not ( isinstance(callback, CustomLogger) and "_PROXY_" in callback.__class__.__name__ ): verbose_logger.debug( f"no-log request, skipping logging for {event_hook} event" ) return False return True def _success_handler_helper_fn( self, result=None, start_time=None, end_time=None, cache_hit=None, standard_logging_object: Optional[StandardLoggingPayload] = None, ): try: if start_time is None: start_time = self.start_time if end_time is None: end_time = datetime.datetime.now() if self.completion_start_time is None: self.completion_start_time = end_time self.model_call_details["completion_start_time"] = ( self.completion_start_time ) self.model_call_details["log_event_type"] = "successful_api_call" self.model_call_details["end_time"] = end_time self.model_call_details["cache_hit"] = cache_hit ## if model in model cost map - log the response cost ## else set cost to None if ( standard_logging_object is None and result is not None and self.stream is not True ): # handle streaming separately if ( isinstance(result, ModelResponse) or isinstance(result, ModelResponseStream) or isinstance(result, EmbeddingResponse) or isinstance(result, ImageResponse) or isinstance(result, TranscriptionResponse) or isinstance(result, TextCompletionResponse) or isinstance(result, HttpxBinaryResponseContent) # tts or isinstance(result, RerankResponse) or isinstance(result, Batch) or isinstance(result, FineTuningJob) ): ## HIDDEN PARAMS ## hidden_params = getattr(result, "_hidden_params", {}) if hidden_params: # add to metadata for logging if self.model_call_details.get("litellm_params") is not None: self.model_call_details["litellm_params"].setdefault( "metadata", {} ) if ( self.model_call_details["litellm_params"]["metadata"] is None ): self.model_call_details["litellm_params"][ "metadata" ] = {} self.model_call_details["litellm_params"]["metadata"][ # type: ignore "hidden_params" ] = getattr( result, "_hidden_params", {} ) ## RESPONSE COST - Only calculate if not in hidden_params ## if "response_cost" in hidden_params: self.model_call_details["response_cost"] = hidden_params[ "response_cost" ] else: self.model_call_details["response_cost"] = ( self._response_cost_calculator(result=result) ) ## STANDARDIZED LOGGING PAYLOAD self.model_call_details["standard_logging_object"] = ( get_standard_logging_object_payload( kwargs=self.model_call_details, init_response_obj=result, start_time=start_time, end_time=end_time, logging_obj=self, status="success", ) ) elif isinstance(result, dict): # pass-through endpoints ## STANDARDIZED LOGGING PAYLOAD self.model_call_details["standard_logging_object"] = ( get_standard_logging_object_payload( kwargs=self.model_call_details, init_response_obj=result, start_time=start_time, end_time=end_time, logging_obj=self, status="success", ) ) elif standard_logging_object is not None: self.model_call_details["standard_logging_object"] = ( standard_logging_object ) else: # streaming chunks + image gen. self.model_call_details["response_cost"] = None if ( litellm.max_budget and self.stream is False and result is not None and isinstance(result, dict) and "content" in result ): time_diff = (end_time - start_time).total_seconds() float_diff = float(time_diff) litellm._current_cost += litellm.completion_cost( model=self.model, prompt="", completion=getattr(result, "content", ""), total_time=float_diff, ) return start_time, end_time, result except Exception as e: raise Exception(f"[Non-Blocking] LiteLLM.Success_Call Error: {str(e)}") def success_handler( # noqa: PLR0915 self, result=None, start_time=None, end_time=None, cache_hit=None, **kwargs ): verbose_logger.debug( f"Logging Details LiteLLM-Success Call: Cache_hit={cache_hit}" ) start_time, end_time, result = self._success_handler_helper_fn( start_time=start_time, end_time=end_time, result=result, cache_hit=cache_hit, standard_logging_object=kwargs.get("standard_logging_object", None), ) try: ## BUILD COMPLETE STREAMED RESPONSE complete_streaming_response: Optional[ Union[ModelResponse, TextCompletionResponse] ] = None if "complete_streaming_response" in self.model_call_details: return # break out of this. complete_streaming_response = self._get_assembled_streaming_response( result=result, start_time=start_time, end_time=end_time, is_async=False, streaming_chunks=self.sync_streaming_chunks, ) if complete_streaming_response is not None: verbose_logger.debug( "Logging Details LiteLLM-Success Call streaming complete" ) self.model_call_details["complete_streaming_response"] = ( complete_streaming_response ) self.model_call_details["response_cost"] = ( self._response_cost_calculator(result=complete_streaming_response) ) ## STANDARDIZED LOGGING PAYLOAD self.model_call_details["standard_logging_object"] = ( get_standard_logging_object_payload( kwargs=self.model_call_details, init_response_obj=complete_streaming_response, start_time=start_time, end_time=end_time, logging_obj=self, status="success", ) ) callbacks = self.get_combined_callback_list( dynamic_success_callbacks=self.dynamic_success_callbacks, global_callbacks=litellm.success_callback, ) ## REDACT MESSAGES ## result = redact_message_input_output_from_logging( model_call_details=( self.model_call_details if hasattr(self, "model_call_details") else {} ), result=result, ) ## LOGGING HOOK ## for callback in callbacks: if isinstance(callback, CustomLogger): self.model_call_details, result = callback.logging_hook( kwargs=self.model_call_details, result=result, call_type=self.call_type, ) for callback in callbacks: try: litellm_params = self.model_call_details.get("litellm_params", {}) should_run = self.should_run_callback( callback=callback, litellm_params=litellm_params, event_hook="success_handler", ) if not should_run: continue if callback == "promptlayer" and promptLayerLogger is not None: print_verbose("reaches promptlayer for logging!") promptLayerLogger.log_event( kwargs=self.model_call_details, response_obj=result, start_time=start_time, end_time=end_time, print_verbose=print_verbose, ) if callback == "supabase" and supabaseClient is not None: print_verbose("reaches supabase for logging!") kwargs = self.model_call_details # this only logs streaming once, complete_streaming_response exists i.e when stream ends if self.stream: if "complete_streaming_response" not in kwargs: continue else: print_verbose("reaches supabase for streaming logging!") result = kwargs["complete_streaming_response"] model = kwargs["model"] messages = kwargs["messages"] optional_params = kwargs.get("optional_params", {}) litellm_params = kwargs.get("litellm_params", {}) supabaseClient.log_event( model=model, messages=messages, end_user=optional_params.get("user", "default"), response_obj=result, start_time=start_time, end_time=end_time, litellm_call_id=litellm_params.get( "litellm_call_id", str(uuid.uuid4()) ), print_verbose=print_verbose, ) if callback == "wandb" and weightsBiasesLogger is not None: print_verbose("reaches wandb for logging!") weightsBiasesLogger.log_event( kwargs=self.model_call_details, response_obj=result, start_time=start_time, end_time=end_time, print_verbose=print_verbose, ) if callback == "logfire" and logfireLogger is not None: verbose_logger.debug("reaches logfire for success logging!") kwargs = {} for k, v in self.model_call_details.items(): if ( k != "original_response" ): # copy.deepcopy raises errors as this could be a coroutine kwargs[k] = v # this only logs streaming once, complete_streaming_response exists i.e when stream ends if self.stream: if "complete_streaming_response" not in kwargs: continue else: print_verbose("reaches logfire for streaming logging!") result = kwargs["complete_streaming_response"] logfireLogger.log_event( kwargs=self.model_call_details, response_obj=result, start_time=start_time, end_time=end_time, print_verbose=print_verbose, level=LogfireLevel.INFO.value, # type: ignore ) if callback == "lunary" and lunaryLogger is not None: print_verbose("reaches lunary for logging!") model = self.model kwargs = self.model_call_details input = kwargs.get("messages", kwargs.get("input", None)) type = ( "embed" if self.call_type == CallTypes.embedding.value else "llm" ) # this only logs streaming once, complete_streaming_response exists i.e when stream ends if self.stream: if "complete_streaming_response" not in kwargs: continue else: result = kwargs["complete_streaming_response"] lunaryLogger.log_event( type=type, kwargs=kwargs, event="end", model=model, input=input, user_id=kwargs.get("user", None), # user_props=self.model_call_details.get("user_props", None), extra=kwargs.get("optional_params", {}), response_obj=result, start_time=start_time, end_time=end_time, run_id=self.litellm_call_id, print_verbose=print_verbose, ) if callback == "helicone" and heliconeLogger is not None: print_verbose("reaches helicone for logging!") model = self.model messages = self.model_call_details["input"] kwargs = self.model_call_details # this only logs streaming once, complete_streaming_response exists i.e when stream ends if self.stream: if "complete_streaming_response" not in kwargs: continue else: print_verbose("reaches helicone for streaming logging!") result = kwargs["complete_streaming_response"] heliconeLogger.log_success( model=model, messages=messages, response_obj=result, start_time=start_time, end_time=end_time, print_verbose=print_verbose, kwargs=kwargs, ) if callback == "langfuse": global langFuseLogger print_verbose("reaches langfuse for success logging!") kwargs = {} for k, v in self.model_call_details.items(): if ( k != "original_response" ): # copy.deepcopy raises errors as this could be a coroutine kwargs[k] = v # this only logs streaming once, complete_streaming_response exists i.e when stream ends if self.stream: verbose_logger.debug( f"is complete_streaming_response in kwargs: {kwargs.get('complete_streaming_response', None)}" ) if complete_streaming_response is None: continue else: print_verbose("reaches langfuse for streaming logging!") result = kwargs["complete_streaming_response"] langfuse_logger_to_use = LangFuseHandler.get_langfuse_logger_for_request( globalLangfuseLogger=langFuseLogger, standard_callback_dynamic_params=self.standard_callback_dynamic_params, in_memory_dynamic_logger_cache=in_memory_dynamic_logger_cache, ) if langfuse_logger_to_use is not None: _response = langfuse_logger_to_use._old_log_event( kwargs=kwargs, response_obj=result, start_time=start_time, end_time=end_time, user_id=kwargs.get("user", None), print_verbose=print_verbose, ) if _response is not None and isinstance(_response, dict): _trace_id = _response.get("trace_id", None) if _trace_id is not None: in_memory_trace_id_cache.set_cache( litellm_call_id=self.litellm_call_id, service_name="langfuse", trace_id=_trace_id, ) if callback == "generic": global genericAPILogger verbose_logger.debug("reaches langfuse for success logging!") kwargs = {} for k, v in self.model_call_details.items(): if ( k != "original_response" ): # copy.deepcopy raises errors as this could be a coroutine kwargs[k] = v # this only logs streaming once, complete_streaming_response exists i.e when stream ends if self.stream: verbose_logger.debug( f"is complete_streaming_response in kwargs: {kwargs.get('complete_streaming_response', None)}" ) if complete_streaming_response is None: continue else: print_verbose("reaches langfuse for streaming logging!") result = kwargs["complete_streaming_response"] if genericAPILogger is None: genericAPILogger = GenericAPILogger() # type: ignore genericAPILogger.log_event( kwargs=kwargs, response_obj=result, start_time=start_time, end_time=end_time, user_id=kwargs.get("user", None), print_verbose=print_verbose, ) if callback == "greenscale" and greenscaleLogger is not None: kwargs = {} for k, v in self.model_call_details.items(): if ( k != "original_response" ): # copy.deepcopy raises errors as this could be a coroutine kwargs[k] = v # this only logs streaming once, complete_streaming_response exists i.e when stream ends if self.stream: verbose_logger.debug( f"is complete_streaming_response in kwargs: {kwargs.get('complete_streaming_response', None)}" ) if complete_streaming_response is None: continue else: print_verbose( "reaches greenscale for streaming logging!" ) result = kwargs["complete_streaming_response"] greenscaleLogger.log_event( kwargs=kwargs, response_obj=result, start_time=start_time, end_time=end_time, print_verbose=print_verbose, ) if callback == "athina" and athinaLogger is not None: deep_copy = {} for k, v in self.model_call_details.items(): deep_copy[k] = v athinaLogger.log_event( kwargs=deep_copy, response_obj=result, start_time=start_time, end_time=end_time, print_verbose=print_verbose, ) if callback == "traceloop": deep_copy = {} for k, v in self.model_call_details.items(): if k != "original_response": deep_copy[k] = v traceloopLogger.log_event( kwargs=deep_copy, response_obj=result, start_time=start_time, end_time=end_time, user_id=kwargs.get("user", None), print_verbose=print_verbose, ) if callback == "s3": global s3Logger if s3Logger is None: s3Logger = S3Logger() if self.stream: if "complete_streaming_response" in self.model_call_details: print_verbose( "S3Logger Logger: Got Stream Event - Completed Stream Response" ) s3Logger.log_event( kwargs=self.model_call_details, response_obj=self.model_call_details[ "complete_streaming_response" ], start_time=start_time, end_time=end_time, print_verbose=print_verbose, ) else: print_verbose( "S3Logger Logger: Got Stream Event - No complete stream response as yet" ) else: s3Logger.log_event( kwargs=self.model_call_details, response_obj=result, start_time=start_time, end_time=end_time, print_verbose=print_verbose, ) if ( callback == "openmeter" and self.model_call_details.get("litellm_params", {}).get( "acompletion", False ) is not True and self.model_call_details.get("litellm_params", {}).get( "aembedding", False ) is not True and self.model_call_details.get("litellm_params", {}).get( "aimage_generation", False ) is not True and self.model_call_details.get("litellm_params", {}).get( "atranscription", False ) is not True ): global openMeterLogger if openMeterLogger is None: print_verbose("Instantiates openmeter client") openMeterLogger = OpenMeterLogger() if self.stream and complete_streaming_response is None: openMeterLogger.log_stream_event( kwargs=self.model_call_details, response_obj=result, start_time=start_time, end_time=end_time, ) else: if self.stream and complete_streaming_response: self.model_call_details["complete_response"] = ( self.model_call_details.get( "complete_streaming_response", {} ) ) result = self.model_call_details["complete_response"] openMeterLogger.log_success_event( kwargs=self.model_call_details, response_obj=result, start_time=start_time, end_time=end_time, ) if ( isinstance(callback, CustomLogger) and self.model_call_details.get("litellm_params", {}).get( "acompletion", False ) is not True and self.model_call_details.get("litellm_params", {}).get( "aembedding", False ) is not True and self.model_call_details.get("litellm_params", {}).get( "aimage_generation", False ) is not True and self.model_call_details.get("litellm_params", {}).get( "atranscription", False ) is not True and self.call_type != CallTypes.pass_through.value # pass-through endpoints call async_log_success_event ): # custom logger class if self.stream and complete_streaming_response is None: callback.log_stream_event( kwargs=self.model_call_details, response_obj=result, start_time=start_time, end_time=end_time, ) else: if self.stream and complete_streaming_response: self.model_call_details["complete_response"] = ( self.model_call_details.get( "complete_streaming_response", {} ) ) result = self.model_call_details["complete_response"] callback.log_success_event( kwargs=self.model_call_details, response_obj=result, start_time=start_time, end_time=end_time, ) if ( callable(callback) is True and self.model_call_details.get("litellm_params", {}).get( "acompletion", False ) is not True and self.model_call_details.get("litellm_params", {}).get( "aembedding", False ) is not True and self.model_call_details.get("litellm_params", {}).get( "aimage_generation", False ) is not True and self.model_call_details.get("litellm_params", {}).get( "atranscription", False ) is not True and customLogger is not None ): # custom logger functions print_verbose( "success callbacks: Running Custom Callback Function - {}".format( callback ) ) customLogger.log_event( kwargs=self.model_call_details, response_obj=result, start_time=start_time, end_time=end_time, print_verbose=print_verbose, callback_func=callback, ) except Exception as e: print_verbose( f"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while success logging with integrations {traceback.format_exc()}" ) print_verbose( f"LiteLLM.Logging: is sentry capture exception initialized {capture_exception}" ) if capture_exception: # log this error to sentry for debugging capture_exception(e) except Exception as e: verbose_logger.exception( "LiteLLM.LoggingError: [Non-Blocking] Exception occurred while success logging {}".format( str(e) ), ) async def async_success_handler( # noqa: PLR0915 self, result=None, start_time=None, end_time=None, cache_hit=None, **kwargs ): """ Implementing async callbacks, to handle asyncio event loop issues when custom integrations need to use async functions. """ print_verbose( "Logging Details LiteLLM-Async Success Call, cache_hit={}".format(cache_hit) ) start_time, end_time, result = self._success_handler_helper_fn( start_time=start_time, end_time=end_time, result=result, cache_hit=cache_hit, standard_logging_object=kwargs.get("standard_logging_object", None), ) ## BUILD COMPLETE STREAMED RESPONSE if "async_complete_streaming_response" in self.model_call_details: return # break out of this. complete_streaming_response: Optional[ Union[ModelResponse, TextCompletionResponse] ] = self._get_assembled_streaming_response( result=result, start_time=start_time, end_time=end_time, is_async=True, streaming_chunks=self.streaming_chunks, ) if complete_streaming_response is not None: print_verbose("Async success callbacks: Got a complete streaming response") self.model_call_details["async_complete_streaming_response"] = ( complete_streaming_response ) try: if self.model_call_details.get("cache_hit", False) is True: self.model_call_details["response_cost"] = 0.0 else: # check if base_model set on azure _get_base_model_from_metadata( model_call_details=self.model_call_details ) # base_model defaults to None if not set on model_info self.model_call_details["response_cost"] = ( self._response_cost_calculator( result=complete_streaming_response ) ) verbose_logger.debug( f"Model={self.model}; cost={self.model_call_details['response_cost']}" ) except litellm.NotFoundError: verbose_logger.warning( f"Model={self.model} not found in completion cost map. Setting 'response_cost' to None" ) self.model_call_details["response_cost"] = None ## STANDARDIZED LOGGING PAYLOAD self.model_call_details["standard_logging_object"] = ( get_standard_logging_object_payload( kwargs=self.model_call_details, init_response_obj=complete_streaming_response, start_time=start_time, end_time=end_time, logging_obj=self, status="success", ) ) callbacks = self.get_combined_callback_list( dynamic_success_callbacks=self.dynamic_async_success_callbacks, global_callbacks=litellm._async_success_callback, ) result = redact_message_input_output_from_logging( model_call_details=( self.model_call_details if hasattr(self, "model_call_details") else {} ), result=result, ) ## LOGGING HOOK ## for callback in callbacks: if isinstance(callback, CustomGuardrail): from litellm.types.guardrails import GuardrailEventHooks if ( callback.should_run_guardrail( data=self.model_call_details, event_type=GuardrailEventHooks.logging_only, ) is not True ): continue self.model_call_details, result = await callback.async_logging_hook( kwargs=self.model_call_details, result=result, call_type=self.call_type, ) elif isinstance(callback, CustomLogger): result = redact_message_input_output_from_custom_logger( result=result, litellm_logging_obj=self, custom_logger=callback ) self.model_call_details, result = await callback.async_logging_hook( kwargs=self.model_call_details, result=result, call_type=self.call_type, ) for callback in callbacks: # check if callback can run for this request litellm_params = self.model_call_details.get("litellm_params", {}) should_run = self.should_run_callback( callback=callback, litellm_params=litellm_params, event_hook="async_success_handler", ) if not should_run: continue try: if callback == "openmeter" and openMeterLogger is not None: if self.stream is True: if ( "async_complete_streaming_response" in self.model_call_details ): await openMeterLogger.async_log_success_event( kwargs=self.model_call_details, response_obj=self.model_call_details[ "async_complete_streaming_response" ], start_time=start_time, end_time=end_time, ) else: await openMeterLogger.async_log_stream_event( # [TODO]: move this to being an async log stream event function kwargs=self.model_call_details, response_obj=result, start_time=start_time, end_time=end_time, ) else: await openMeterLogger.async_log_success_event( kwargs=self.model_call_details, response_obj=result, start_time=start_time, end_time=end_time, ) if isinstance(callback, CustomLogger): # custom logger class if self.stream is True: if ( "async_complete_streaming_response" in self.model_call_details ): await callback.async_log_success_event( kwargs=self.model_call_details, response_obj=self.model_call_details[ "async_complete_streaming_response" ], start_time=start_time, end_time=end_time, ) else: await callback.async_log_stream_event( # [TODO]: move this to being an async log stream event function kwargs=self.model_call_details, response_obj=result, start_time=start_time, end_time=end_time, ) else: await callback.async_log_success_event( kwargs=self.model_call_details, response_obj=result, start_time=start_time, end_time=end_time, ) if callable(callback): # custom logger functions global customLogger if customLogger is None: customLogger = CustomLogger() if self.stream: if ( "async_complete_streaming_response" in self.model_call_details ): await customLogger.async_log_event( kwargs=self.model_call_details, response_obj=self.model_call_details[ "async_complete_streaming_response" ], start_time=start_time, end_time=end_time, print_verbose=print_verbose, callback_func=callback, ) else: await customLogger.async_log_event( kwargs=self.model_call_details, response_obj=result, start_time=start_time, end_time=end_time, print_verbose=print_verbose, callback_func=callback, ) if callback == "dynamodb": global dynamoLogger if dynamoLogger is None: dynamoLogger = DyanmoDBLogger() if self.stream: if ( "async_complete_streaming_response" in self.model_call_details ): print_verbose( "DynamoDB Logger: Got Stream Event - Completed Stream Response" ) await dynamoLogger._async_log_event( kwargs=self.model_call_details, response_obj=self.model_call_details[ "async_complete_streaming_response" ], start_time=start_time, end_time=end_time, print_verbose=print_verbose, ) else: print_verbose( "DynamoDB Logger: Got Stream Event - No complete stream response as yet" ) else: await dynamoLogger._async_log_event( kwargs=self.model_call_details, response_obj=result, start_time=start_time, end_time=end_time, print_verbose=print_verbose, ) except Exception: verbose_logger.error( f"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while success logging {traceback.format_exc()}" ) pass def _failure_handler_helper_fn( self, exception, traceback_exception, start_time=None, end_time=None ): if start_time is None: start_time = self.start_time if end_time is None: end_time = datetime.datetime.now() # on some exceptions, model_call_details is not always initialized, this ensures that we still log those exceptions if not hasattr(self, "model_call_details"): self.model_call_details = {} self.model_call_details["log_event_type"] = "failed_api_call" self.model_call_details["exception"] = exception self.model_call_details["traceback_exception"] = traceback_exception self.model_call_details["end_time"] = end_time self.model_call_details.setdefault("original_response", None) self.model_call_details["response_cost"] = 0 if hasattr(exception, "headers") and isinstance(exception.headers, dict): self.model_call_details.setdefault("litellm_params", {}) metadata = ( self.model_call_details["litellm_params"].get("metadata", {}) or {} ) metadata.update(exception.headers) ## STANDARDIZED LOGGING PAYLOAD self.model_call_details["standard_logging_object"] = ( get_standard_logging_object_payload( kwargs=self.model_call_details, init_response_obj={}, start_time=start_time, end_time=end_time, logging_obj=self, status="failure", error_str=str(exception), original_exception=exception, ) ) return start_time, end_time async def special_failure_handlers(self, exception: Exception): """ Custom events, emitted for specific failures. Currently just for router model group rate limit error """ from litellm.types.router import RouterErrors litellm_params: dict = self.model_call_details.get("litellm_params") or {} metadata = litellm_params.get("metadata") or {} ## BASE CASE ## check if rate limit error for model group size 1 is_base_case = False if metadata.get("model_group_size") is not None: model_group_size = metadata.get("model_group_size") if isinstance(model_group_size, int) and model_group_size == 1: is_base_case = True ## check if special error ## if ( RouterErrors.no_deployments_available.value not in str(exception) and is_base_case is False ): return ## get original model group ## model_group = metadata.get("model_group") or None for callback in litellm._async_failure_callback: if isinstance(callback, CustomLogger): # custom logger class await callback.log_model_group_rate_limit_error( exception=exception, original_model_group=model_group, kwargs=self.model_call_details, ) # type: ignore def failure_handler( # noqa: PLR0915 self, exception, traceback_exception, start_time=None, end_time=None ): verbose_logger.debug( f"Logging Details LiteLLM-Failure Call: {litellm.failure_callback}" ) try: start_time, end_time = self._failure_handler_helper_fn( exception=exception, traceback_exception=traceback_exception, start_time=start_time, end_time=end_time, ) callbacks = self.get_combined_callback_list( dynamic_success_callbacks=self.dynamic_failure_callbacks, global_callbacks=litellm.failure_callback, ) result = None # result sent to all loggers, init this to None incase it's not created result = redact_message_input_output_from_logging( model_call_details=( self.model_call_details if hasattr(self, "model_call_details") else {} ), result=result, ) for callback in callbacks: try: if callback == "lunary" and lunaryLogger is not None: print_verbose("reaches lunary for logging error!") model = self.model input = self.model_call_details["input"] _type = ( "embed" if self.call_type == CallTypes.embedding.value else "llm" ) lunaryLogger.log_event( kwargs=self.model_call_details, type=_type, event="error", user_id=self.model_call_details.get("user", "default"), model=model, input=input, error=traceback_exception, run_id=self.litellm_call_id, start_time=start_time, end_time=end_time, print_verbose=print_verbose, ) if callback == "sentry": print_verbose("sending exception to sentry") if capture_exception: capture_exception(exception) else: print_verbose( f"capture exception not initialized: {capture_exception}" ) elif callback == "supabase" and supabaseClient is not None: print_verbose("reaches supabase for logging!") print_verbose(f"supabaseClient: {supabaseClient}") supabaseClient.log_event( model=self.model if hasattr(self, "model") else "", messages=self.messages, end_user=self.model_call_details.get("user", "default"), response_obj=result, start_time=start_time, end_time=end_time, litellm_call_id=self.model_call_details["litellm_call_id"], print_verbose=print_verbose, ) if ( callable(callback) and customLogger is not None ): # custom logger functions customLogger.log_event( kwargs=self.model_call_details, response_obj=result, start_time=start_time, end_time=end_time, print_verbose=print_verbose, callback_func=callback, ) if ( isinstance(callback, CustomLogger) and self.model_call_details.get("litellm_params", {}).get( "acompletion", False ) is not True and self.model_call_details.get("litellm_params", {}).get( "aembedding", False ) is not True ): # custom logger class callback.log_failure_event( start_time=start_time, end_time=end_time, response_obj=result, kwargs=self.model_call_details, ) if callback == "langfuse": global langFuseLogger verbose_logger.debug("reaches langfuse for logging failure") kwargs = {} for k, v in self.model_call_details.items(): if ( k != "original_response" ): # copy.deepcopy raises errors as this could be a coroutine kwargs[k] = v # this only logs streaming once, complete_streaming_response exists i.e when stream ends langfuse_logger_to_use = LangFuseHandler.get_langfuse_logger_for_request( globalLangfuseLogger=langFuseLogger, standard_callback_dynamic_params=self.standard_callback_dynamic_params, in_memory_dynamic_logger_cache=in_memory_dynamic_logger_cache, ) _response = langfuse_logger_to_use._old_log_event( start_time=start_time, end_time=end_time, response_obj=None, user_id=kwargs.get("user", None), print_verbose=print_verbose, status_message=str(exception), level="ERROR", kwargs=self.model_call_details, ) if _response is not None and isinstance(_response, dict): _trace_id = _response.get("trace_id", None) if _trace_id is not None: in_memory_trace_id_cache.set_cache( litellm_call_id=self.litellm_call_id, service_name="langfuse", trace_id=_trace_id, ) if callback == "traceloop": traceloopLogger.log_event( start_time=start_time, end_time=end_time, response_obj=None, user_id=self.model_call_details.get("user", None), print_verbose=print_verbose, status_message=str(exception), level="ERROR", kwargs=self.model_call_details, ) if callback == "logfire" and logfireLogger is not None: verbose_logger.debug("reaches logfire for failure logging!") kwargs = {} for k, v in self.model_call_details.items(): if ( k != "original_response" ): # copy.deepcopy raises errors as this could be a coroutine kwargs[k] = v kwargs["exception"] = exception logfireLogger.log_event( kwargs=kwargs, response_obj=result, start_time=start_time, end_time=end_time, level=LogfireLevel.ERROR.value, # type: ignore print_verbose=print_verbose, ) except Exception as e: print_verbose( f"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while failure logging with integrations {str(e)}" ) print_verbose( f"LiteLLM.Logging: is sentry capture exception initialized {capture_exception}" ) if capture_exception: # log this error to sentry for debugging capture_exception(e) except Exception as e: verbose_logger.exception( "LiteLLM.LoggingError: [Non-Blocking] Exception occurred while failure logging {}".format( str(e) ) ) async def async_failure_handler( self, exception, traceback_exception, start_time=None, end_time=None ): """ Implementing async callbacks, to handle asyncio event loop issues when custom integrations need to use async functions. """ await self.special_failure_handlers(exception=exception) start_time, end_time = self._failure_handler_helper_fn( exception=exception, traceback_exception=traceback_exception, start_time=start_time, end_time=end_time, ) callbacks = self.get_combined_callback_list( dynamic_success_callbacks=self.dynamic_async_failure_callbacks, global_callbacks=litellm._async_failure_callback, ) result = None # result sent to all loggers, init this to None incase it's not created for callback in callbacks: try: if isinstance(callback, CustomLogger): # custom logger class await callback.async_log_failure_event( kwargs=self.model_call_details, response_obj=result, start_time=start_time, end_time=end_time, ) # type: ignore if ( callable(callback) and customLogger is not None ): # custom logger functions await customLogger.async_log_event( kwargs=self.model_call_details, response_obj=result, start_time=start_time, end_time=end_time, print_verbose=print_verbose, callback_func=callback, ) except Exception as e: verbose_logger.exception( "LiteLLM.LoggingError: [Non-Blocking] Exception occurred while failure \ logging {}\nCallback={}".format( str(e), callback ) ) def _get_trace_id(self, service_name: Literal["langfuse"]) -> Optional[str]: """ For the given service (e.g. langfuse), return the trace_id actually logged. Used for constructing the url in slack alerting. Returns: - str: The logged trace id - None: If trace id not yet emitted. """ trace_id: Optional[str] = None if service_name == "langfuse": trace_id = in_memory_trace_id_cache.get_cache( litellm_call_id=self.litellm_call_id, service_name=service_name ) return trace_id def _get_callback_object(self, service_name: Literal["langfuse"]) -> Optional[Any]: """ Return dynamic callback object. Meant to solve issue when doing key-based/team-based logging """ global langFuseLogger if service_name == "langfuse": if langFuseLogger is None or ( ( self.standard_callback_dynamic_params.get("langfuse_public_key") is not None and self.standard_callback_dynamic_params.get("langfuse_public_key") != langFuseLogger.public_key ) or ( self.standard_callback_dynamic_params.get("langfuse_public_key") is not None and self.standard_callback_dynamic_params.get("langfuse_public_key") != langFuseLogger.public_key ) or ( self.standard_callback_dynamic_params.get("langfuse_host") is not None and self.standard_callback_dynamic_params.get("langfuse_host") != langFuseLogger.langfuse_host ) ): return LangFuseLogger( langfuse_public_key=self.standard_callback_dynamic_params.get( "langfuse_public_key" ), langfuse_secret=self.standard_callback_dynamic_params.get( "langfuse_secret" ), langfuse_host=self.standard_callback_dynamic_params.get( "langfuse_host" ), ) return langFuseLogger return None def handle_sync_success_callbacks_for_async_calls( self, result: Any, start_time: datetime.datetime, end_time: datetime.datetime, ) -> None: """ Handles calling success callbacks for Async calls. Why: Some callbacks - `langfuse`, `s3` are sync callbacks. We need to call them in the executor. """ if self._should_run_sync_callbacks_for_async_calls() is False: return executor.submit( self.success_handler, result, start_time, end_time, ) def _should_run_sync_callbacks_for_async_calls(self) -> bool: """ Returns: - bool: True if sync callbacks should be run for async calls. eg. `langfuse`, `s3` """ _combined_sync_callbacks = self.get_combined_callback_list( dynamic_success_callbacks=self.dynamic_success_callbacks, global_callbacks=litellm.success_callback, ) _filtered_success_callbacks = self._remove_internal_custom_logger_callbacks( _combined_sync_callbacks ) _filtered_success_callbacks = self._remove_internal_litellm_callbacks( _filtered_success_callbacks ) return len(_filtered_success_callbacks) > 0 def get_combined_callback_list( self, dynamic_success_callbacks: Optional[List], global_callbacks: List ) -> List: if dynamic_success_callbacks is None: return global_callbacks return list(set(dynamic_success_callbacks + global_callbacks)) def _remove_internal_litellm_callbacks(self, callbacks: List) -> List: """ Creates a filtered list of callbacks, excluding internal LiteLLM callbacks. Args: callbacks: List of callback functions/strings to filter Returns: List of filtered callbacks with internal ones removed """ filtered = [ cb for cb in callbacks if not self._is_internal_litellm_proxy_callback(cb) ] verbose_logger.debug(f"Filtered callbacks: {filtered}") return filtered def _get_callback_name(self, cb) -> str: """ Helper to get the name of a callback function Args: cb: The callback function/string to get the name of Returns: The name of the callback """ if hasattr(cb, "__name__"): return cb.__name__ if hasattr(cb, "__func__"): return cb.__func__.__name__ return str(cb) def _is_internal_litellm_proxy_callback(self, cb) -> bool: """Helper to check if a callback is internal""" INTERNAL_PREFIXES = [ "_PROXY", "_service_logger.ServiceLogging", "sync_deployment_callback_on_success", ] if isinstance(cb, str): return False if not callable(cb): return True cb_name = self._get_callback_name(cb) return any(prefix in cb_name for prefix in INTERNAL_PREFIXES) def _remove_internal_custom_logger_callbacks(self, callbacks: List) -> List: """ Removes internal custom logger callbacks from the list. """ _new_callbacks = [] for _c in callbacks: if isinstance(_c, CustomLogger): continue elif ( isinstance(_c, str) and _c in litellm._known_custom_logger_compatible_callbacks ): continue _new_callbacks.append(_c) return _new_callbacks def _get_assembled_streaming_response( self, result: Union[ModelResponse, TextCompletionResponse, ModelResponseStream, Any], start_time: datetime.datetime, end_time: datetime.datetime, is_async: bool, streaming_chunks: List[Any], ) -> Optional[Union[ModelResponse, TextCompletionResponse]]: if isinstance(result, ModelResponse): return result elif isinstance(result, TextCompletionResponse): return result elif isinstance(result, ModelResponseStream): complete_streaming_response: Optional[ Union[ModelResponse, TextCompletionResponse] ] = _assemble_complete_response_from_streaming_chunks( result=result, start_time=start_time, end_time=end_time, request_kwargs=self.model_call_details, streaming_chunks=streaming_chunks, is_async=is_async, ) return complete_streaming_response return None def set_callbacks(callback_list, function_id=None): # noqa: PLR0915 """ Globally sets the callback client """ global sentry_sdk_instance, capture_exception, add_breadcrumb, posthog, slack_app, alerts_channel, traceloopLogger, athinaLogger, heliconeLogger, supabaseClient, lunaryLogger, promptLayerLogger, langFuseLogger, customLogger, weightsBiasesLogger, logfireLogger, dynamoLogger, s3Logger, dataDogLogger, prometheusLogger, greenscaleLogger, openMeterLogger try: for callback in callback_list: if callback == "sentry": try: import sentry_sdk except ImportError: print_verbose("Package 'sentry_sdk' is missing. Installing it...") subprocess.check_call( [sys.executable, "-m", "pip", "install", "sentry_sdk"] ) import sentry_sdk sentry_sdk_instance = sentry_sdk sentry_trace_rate = ( os.environ.get("SENTRY_API_TRACE_RATE") if "SENTRY_API_TRACE_RATE" in os.environ else "1.0" ) sentry_sdk_instance.init( dsn=os.environ.get("SENTRY_DSN"), traces_sample_rate=float(sentry_trace_rate), # type: ignore ) capture_exception = sentry_sdk_instance.capture_exception add_breadcrumb = sentry_sdk_instance.add_breadcrumb elif callback == "posthog": try: from posthog import Posthog except ImportError: print_verbose("Package 'posthog' is missing. Installing it...") subprocess.check_call( [sys.executable, "-m", "pip", "install", "posthog"] ) from posthog import Posthog posthog = Posthog( project_api_key=os.environ.get("POSTHOG_API_KEY"), host=os.environ.get("POSTHOG_API_URL"), ) elif callback == "slack": try: from slack_bolt import App except ImportError: print_verbose("Package 'slack_bolt' is missing. Installing it...") subprocess.check_call( [sys.executable, "-m", "pip", "install", "slack_bolt"] ) from slack_bolt import App slack_app = App( token=os.environ.get("SLACK_API_TOKEN"), signing_secret=os.environ.get("SLACK_API_SECRET"), ) alerts_channel = os.environ["SLACK_API_CHANNEL"] print_verbose(f"Initialized Slack App: {slack_app}") elif callback == "traceloop": traceloopLogger = TraceloopLogger() elif callback == "athina": athinaLogger = AthinaLogger() print_verbose("Initialized Athina Logger") elif callback == "helicone": heliconeLogger = HeliconeLogger() elif callback == "lunary": lunaryLogger = LunaryLogger() elif callback == "promptlayer": promptLayerLogger = PromptLayerLogger() elif callback == "langfuse": langFuseLogger = LangFuseLogger( langfuse_public_key=None, langfuse_secret=None, langfuse_host=None ) elif callback == "openmeter": openMeterLogger = OpenMeterLogger() elif callback == "datadog": dataDogLogger = DataDogLogger() elif callback == "dynamodb": dynamoLogger = DyanmoDBLogger() elif callback == "s3": s3Logger = S3Logger() elif callback == "wandb": weightsBiasesLogger = WeightsBiasesLogger() elif callback == "logfire": logfireLogger = LogfireLogger() elif callback == "supabase": print_verbose("instantiating supabase") supabaseClient = Supabase() elif callback == "greenscale": greenscaleLogger = GreenscaleLogger() print_verbose("Initialized Greenscale Logger") elif callable(callback): customLogger = CustomLogger() except Exception as e: raise e def _init_custom_logger_compatible_class( # noqa: PLR0915 logging_integration: _custom_logger_compatible_callbacks_literal, internal_usage_cache: Optional[DualCache], llm_router: Optional[ Any ], # expect litellm.Router, but typing errors due to circular import custom_logger_init_args: Optional[dict] = {}, ) -> Optional[CustomLogger]: """ Initialize a custom logger compatible class """ try: custom_logger_init_args = custom_logger_init_args or {} if logging_integration == "lago": for callback in _in_memory_loggers: if isinstance(callback, LagoLogger): return callback # type: ignore lago_logger = LagoLogger() _in_memory_loggers.append(lago_logger) return lago_logger # type: ignore elif logging_integration == "openmeter": for callback in _in_memory_loggers: if isinstance(callback, OpenMeterLogger): return callback # type: ignore _openmeter_logger = OpenMeterLogger() _in_memory_loggers.append(_openmeter_logger) return _openmeter_logger # type: ignore elif logging_integration == "braintrust": for callback in _in_memory_loggers: if isinstance(callback, BraintrustLogger): return callback # type: ignore braintrust_logger = BraintrustLogger() _in_memory_loggers.append(braintrust_logger) return braintrust_logger # type: ignore elif logging_integration == "langsmith": for callback in _in_memory_loggers: if isinstance(callback, LangsmithLogger): return callback # type: ignore _langsmith_logger = LangsmithLogger() _in_memory_loggers.append(_langsmith_logger) return _langsmith_logger # type: ignore elif logging_integration == "argilla": for callback in _in_memory_loggers: if isinstance(callback, ArgillaLogger): return callback # type: ignore _argilla_logger = ArgillaLogger() _in_memory_loggers.append(_argilla_logger) return _argilla_logger # type: ignore elif logging_integration == "literalai": for callback in _in_memory_loggers: if isinstance(callback, LiteralAILogger): return callback # type: ignore _literalai_logger = LiteralAILogger() _in_memory_loggers.append(_literalai_logger) return _literalai_logger # type: ignore elif logging_integration == "prometheus": for callback in _in_memory_loggers: if isinstance(callback, PrometheusLogger): return callback # type: ignore _prometheus_logger = PrometheusLogger() _in_memory_loggers.append(_prometheus_logger) return _prometheus_logger # type: ignore elif logging_integration == "datadog": for callback in _in_memory_loggers: if isinstance(callback, DataDogLogger): return callback # type: ignore _datadog_logger = DataDogLogger() _in_memory_loggers.append(_datadog_logger) return _datadog_logger # type: ignore elif logging_integration == "datadog_llm_observability": _datadog_llm_obs_logger = DataDogLLMObsLogger() _in_memory_loggers.append(_datadog_llm_obs_logger) return _datadog_llm_obs_logger # type: ignore elif logging_integration == "gcs_bucket": for callback in _in_memory_loggers: if isinstance(callback, GCSBucketLogger): return callback # type: ignore _gcs_bucket_logger = GCSBucketLogger() _in_memory_loggers.append(_gcs_bucket_logger) return _gcs_bucket_logger # type: ignore elif logging_integration == "azure_storage": for callback in _in_memory_loggers: if isinstance(callback, AzureBlobStorageLogger): return callback # type: ignore _azure_storage_logger = AzureBlobStorageLogger() _in_memory_loggers.append(_azure_storage_logger) return _azure_storage_logger # type: ignore elif logging_integration == "opik": for callback in _in_memory_loggers: if isinstance(callback, OpikLogger): return callback # type: ignore _opik_logger = OpikLogger() _in_memory_loggers.append(_opik_logger) return _opik_logger # type: ignore elif logging_integration == "arize": from litellm.integrations.opentelemetry import ( OpenTelemetry, OpenTelemetryConfig, ) otel_config = ArizeLogger.get_arize_opentelemetry_config() if otel_config is None: raise ValueError( "No valid endpoint found for Arize, please set 'ARIZE_ENDPOINT' to your GRPC endpoint or 'ARIZE_HTTP_ENDPOINT' to your HTTP endpoint" ) os.environ["OTEL_EXPORTER_OTLP_TRACES_HEADERS"] = ( f"space_key={os.getenv('ARIZE_SPACE_KEY')},api_key={os.getenv('ARIZE_API_KEY')}" ) for callback in _in_memory_loggers: if ( isinstance(callback, OpenTelemetry) and callback.callback_name == "arize" ): return callback # type: ignore _otel_logger = OpenTelemetry(config=otel_config, callback_name="arize") _in_memory_loggers.append(_otel_logger) return _otel_logger # type: ignore elif logging_integration == "otel": from litellm.integrations.opentelemetry import OpenTelemetry for callback in _in_memory_loggers: if isinstance(callback, OpenTelemetry): return callback # type: ignore otel_logger = OpenTelemetry( **_get_custom_logger_settings_from_proxy_server( callback_name=logging_integration ) ) _in_memory_loggers.append(otel_logger) return otel_logger # type: ignore elif logging_integration == "galileo": for callback in _in_memory_loggers: if isinstance(callback, GalileoObserve): return callback # type: ignore galileo_logger = GalileoObserve() _in_memory_loggers.append(galileo_logger) return galileo_logger # type: ignore elif logging_integration == "logfire": if "LOGFIRE_TOKEN" not in os.environ: raise ValueError("LOGFIRE_TOKEN not found in environment variables") from litellm.integrations.opentelemetry import ( OpenTelemetry, OpenTelemetryConfig, ) otel_config = OpenTelemetryConfig( exporter="otlp_http", endpoint="https://logfire-api.pydantic.dev/v1/traces", headers=f"Authorization={os.getenv('LOGFIRE_TOKEN')}", ) for callback in _in_memory_loggers: if isinstance(callback, OpenTelemetry): return callback # type: ignore _otel_logger = OpenTelemetry(config=otel_config) _in_memory_loggers.append(_otel_logger) return _otel_logger # type: ignore elif logging_integration == "dynamic_rate_limiter": from litellm.proxy.hooks.dynamic_rate_limiter import ( _PROXY_DynamicRateLimitHandler, ) for callback in _in_memory_loggers: if isinstance(callback, _PROXY_DynamicRateLimitHandler): return callback # type: ignore if internal_usage_cache is None: raise Exception( "Internal Error: Cache cannot be empty - internal_usage_cache={}".format( internal_usage_cache ) ) dynamic_rate_limiter_obj = _PROXY_DynamicRateLimitHandler( internal_usage_cache=internal_usage_cache ) if llm_router is not None and isinstance(llm_router, litellm.Router): dynamic_rate_limiter_obj.update_variables(llm_router=llm_router) _in_memory_loggers.append(dynamic_rate_limiter_obj) return dynamic_rate_limiter_obj # type: ignore elif logging_integration == "langtrace": if "LANGTRACE_API_KEY" not in os.environ: raise ValueError("LANGTRACE_API_KEY not found in environment variables") from litellm.integrations.opentelemetry import ( OpenTelemetry, OpenTelemetryConfig, ) otel_config = OpenTelemetryConfig( exporter="otlp_http", endpoint="https://langtrace.ai/api/trace", ) os.environ["OTEL_EXPORTER_OTLP_TRACES_HEADERS"] = ( f"api_key={os.getenv('LANGTRACE_API_KEY')}" ) for callback in _in_memory_loggers: if ( isinstance(callback, OpenTelemetry) and callback.callback_name == "langtrace" ): return callback # type: ignore _otel_logger = OpenTelemetry(config=otel_config, callback_name="langtrace") _in_memory_loggers.append(_otel_logger) return _otel_logger # type: ignore elif logging_integration == "mlflow": for callback in _in_memory_loggers: if isinstance(callback, MlflowLogger): return callback # type: ignore _mlflow_logger = MlflowLogger() _in_memory_loggers.append(_mlflow_logger) return _mlflow_logger # type: ignore elif logging_integration == "langfuse": for callback in _in_memory_loggers: if isinstance(callback, LangfusePromptManagement): return callback langfuse_logger = LangfusePromptManagement() _in_memory_loggers.append(langfuse_logger) return langfuse_logger # type: ignore elif logging_integration == "pagerduty": for callback in _in_memory_loggers: if isinstance(callback, PagerDutyAlerting): return callback pagerduty_logger = PagerDutyAlerting(**custom_logger_init_args) _in_memory_loggers.append(pagerduty_logger) return pagerduty_logger # type: ignore elif logging_integration == "gcs_pubsub": for callback in _in_memory_loggers: if isinstance(callback, GcsPubSubLogger): return callback _gcs_pubsub_logger = GcsPubSubLogger() _in_memory_loggers.append(_gcs_pubsub_logger) return _gcs_pubsub_logger # type: ignore elif logging_integration == "humanloop": for callback in _in_memory_loggers: if isinstance(callback, HumanloopLogger): return callback humanloop_logger = HumanloopLogger() _in_memory_loggers.append(humanloop_logger) return humanloop_logger # type: ignore except Exception as e: verbose_logger.exception( f"[Non-Blocking Error] Error initializing custom logger: {e}" ) return None def get_custom_logger_compatible_class( # noqa: PLR0915 logging_integration: _custom_logger_compatible_callbacks_literal, ) -> Optional[CustomLogger]: try: if logging_integration == "lago": for callback in _in_memory_loggers: if isinstance(callback, LagoLogger): return callback elif logging_integration == "openmeter": for callback in _in_memory_loggers: if isinstance(callback, OpenMeterLogger): return callback elif logging_integration == "braintrust": for callback in _in_memory_loggers: if isinstance(callback, BraintrustLogger): return callback elif logging_integration == "galileo": for callback in _in_memory_loggers: if isinstance(callback, GalileoObserve): return callback elif logging_integration == "langsmith": for callback in _in_memory_loggers: if isinstance(callback, LangsmithLogger): return callback elif logging_integration == "argilla": for callback in _in_memory_loggers: if isinstance(callback, ArgillaLogger): return callback elif logging_integration == "literalai": for callback in _in_memory_loggers: if isinstance(callback, LiteralAILogger): return callback elif logging_integration == "prometheus": for callback in _in_memory_loggers: if isinstance(callback, PrometheusLogger): return callback elif logging_integration == "datadog": for callback in _in_memory_loggers: if isinstance(callback, DataDogLogger): return callback elif logging_integration == "datadog_llm_observability": for callback in _in_memory_loggers: if isinstance(callback, DataDogLLMObsLogger): return callback elif logging_integration == "gcs_bucket": for callback in _in_memory_loggers: if isinstance(callback, GCSBucketLogger): return callback elif logging_integration == "azure_storage": for callback in _in_memory_loggers: if isinstance(callback, AzureBlobStorageLogger): return callback elif logging_integration == "opik": for callback in _in_memory_loggers: if isinstance(callback, OpikLogger): return callback elif logging_integration == "langfuse": for callback in _in_memory_loggers: if isinstance(callback, LangfusePromptManagement): return callback elif logging_integration == "otel": from litellm.integrations.opentelemetry import OpenTelemetry for callback in _in_memory_loggers: if isinstance(callback, OpenTelemetry): return callback elif logging_integration == "arize": from litellm.integrations.opentelemetry import OpenTelemetry if "ARIZE_SPACE_KEY" not in os.environ: raise ValueError("ARIZE_SPACE_KEY not found in environment variables") if "ARIZE_API_KEY" not in os.environ: raise ValueError("ARIZE_API_KEY not found in environment variables") for callback in _in_memory_loggers: if ( isinstance(callback, OpenTelemetry) and callback.callback_name == "arize" ): return callback elif logging_integration == "logfire": if "LOGFIRE_TOKEN" not in os.environ: raise ValueError("LOGFIRE_TOKEN not found in environment variables") from litellm.integrations.opentelemetry import OpenTelemetry for callback in _in_memory_loggers: if isinstance(callback, OpenTelemetry): return callback # type: ignore elif logging_integration == "dynamic_rate_limiter": from litellm.proxy.hooks.dynamic_rate_limiter import ( _PROXY_DynamicRateLimitHandler, ) for callback in _in_memory_loggers: if isinstance(callback, _PROXY_DynamicRateLimitHandler): return callback # type: ignore elif logging_integration == "langtrace": from litellm.integrations.opentelemetry import OpenTelemetry if "LANGTRACE_API_KEY" not in os.environ: raise ValueError("LANGTRACE_API_KEY not found in environment variables") for callback in _in_memory_loggers: if ( isinstance(callback, OpenTelemetry) and callback.callback_name == "langtrace" ): return callback elif logging_integration == "mlflow": for callback in _in_memory_loggers: if isinstance(callback, MlflowLogger): return callback elif logging_integration == "pagerduty": for callback in _in_memory_loggers: if isinstance(callback, PagerDutyAlerting): return callback elif logging_integration == "gcs_pubsub": for callback in _in_memory_loggers: if isinstance(callback, GcsPubSubLogger): return callback return None except Exception as e: verbose_logger.exception( f"[Non-Blocking Error] Error getting custom logger: {e}" ) return None def _get_custom_logger_settings_from_proxy_server(callback_name: str) -> Dict: """ Get the settings for a custom logger from the proxy server config.yaml Proxy server config.yaml defines callback_settings as: callback_settings: otel: message_logging: False """ from litellm.proxy.proxy_server import callback_settings if callback_settings: return dict(callback_settings.get(callback_name, {})) return {} def use_custom_pricing_for_model(litellm_params: Optional[dict]) -> bool: """ Check if the model uses custom pricing Returns True if any of `SPECIAL_MODEL_INFO_PARAMS` are present in `litellm_params` or `model_info` """ if litellm_params is None: return False metadata: dict = litellm_params.get("metadata", {}) or {} model_info: dict = metadata.get("model_info", {}) or {} for _custom_cost_param in SPECIAL_MODEL_INFO_PARAMS: if litellm_params.get(_custom_cost_param, None) is not None: return True elif model_info.get(_custom_cost_param, None) is not None: return True return False def is_valid_sha256_hash(value: str) -> bool: # Check if the value is a valid SHA-256 hash (64 hexadecimal characters) return bool(re.fullmatch(r"[a-fA-F0-9]{64}", value)) class StandardLoggingPayloadSetup: @staticmethod def cleanup_timestamps( start_time: Union[dt_object, float], end_time: Union[dt_object, float], completion_start_time: Union[dt_object, float], ) -> Tuple[float, float, float]: """ Convert datetime objects to floats Args: start_time: Union[dt_object, float] end_time: Union[dt_object, float] completion_start_time: Union[dt_object, float] Returns: Tuple[float, float, float]: A tuple containing the start time, end time, and completion start time as floats. """ if isinstance(start_time, datetime.datetime): start_time_float = start_time.timestamp() elif isinstance(start_time, float): start_time_float = start_time else: raise ValueError( f"start_time is required, got={start_time} of type {type(start_time)}" ) if isinstance(end_time, datetime.datetime): end_time_float = end_time.timestamp() elif isinstance(end_time, float): end_time_float = end_time else: raise ValueError( f"end_time is required, got={end_time} of type {type(end_time)}" ) if isinstance(completion_start_time, datetime.datetime): completion_start_time_float = completion_start_time.timestamp() elif isinstance(completion_start_time, float): completion_start_time_float = completion_start_time else: completion_start_time_float = end_time_float return start_time_float, end_time_float, completion_start_time_float @staticmethod def get_standard_logging_metadata( metadata: Optional[Dict[str, Any]], litellm_params: Optional[dict] = None, prompt_integration: Optional[str] = None, ) -> StandardLoggingMetadata: """ Clean and filter the metadata dictionary to include only the specified keys in StandardLoggingMetadata. Args: metadata (Optional[Dict[str, Any]]): The original metadata dictionary. Returns: StandardLoggingMetadata: A StandardLoggingMetadata object containing the cleaned metadata. Note: - If the input metadata is None or not a dictionary, an empty StandardLoggingMetadata object is returned. - If 'user_api_key' is present in metadata and is a valid SHA256 hash, it's stored as 'user_api_key_hash'. """ prompt_management_metadata: Optional[ StandardLoggingPromptManagementMetadata ] = None if litellm_params is not None: prompt_id = cast(Optional[str], litellm_params.get("prompt_id", None)) prompt_variables = cast( Optional[dict], litellm_params.get("prompt_variables", None) ) if prompt_id is not None and prompt_integration is not None: prompt_management_metadata = StandardLoggingPromptManagementMetadata( prompt_id=prompt_id, prompt_variables=prompt_variables, prompt_integration=prompt_integration, ) # Initialize with default values clean_metadata = StandardLoggingMetadata( user_api_key_hash=None, user_api_key_alias=None, user_api_key_team_id=None, user_api_key_org_id=None, user_api_key_user_id=None, user_api_key_team_alias=None, spend_logs_metadata=None, requester_ip_address=None, requester_metadata=None, user_api_key_end_user_id=None, prompt_management_metadata=prompt_management_metadata, ) if isinstance(metadata, dict): # Filter the metadata dictionary to include only the specified keys supported_keys = StandardLoggingMetadata.__annotations__.keys() for key in supported_keys: if key in metadata: clean_metadata[key] = metadata[key] # type: ignore if metadata.get("user_api_key") is not None: if is_valid_sha256_hash(str(metadata.get("user_api_key"))): clean_metadata["user_api_key_hash"] = metadata.get( "user_api_key" ) # this is the hash _potential_requester_metadata = metadata.get( "metadata", None ) # check if user passed metadata in the sdk request - e.g. metadata for langsmith logging - https://docs.litellm.ai/docs/observability/langsmith_integration#set-langsmith-fields if ( clean_metadata["requester_metadata"] is None and _potential_requester_metadata is not None and isinstance(_potential_requester_metadata, dict) ): clean_metadata["requester_metadata"] = _potential_requester_metadata return clean_metadata @staticmethod def get_usage_from_response_obj(response_obj: Optional[dict]) -> Usage: ## BASE CASE ## if response_obj is None: return Usage( prompt_tokens=0, completion_tokens=0, total_tokens=0, ) usage = response_obj.get("usage", None) or {} if usage is None or ( not isinstance(usage, dict) and not isinstance(usage, Usage) ): return Usage( prompt_tokens=0, completion_tokens=0, total_tokens=0, ) elif isinstance(usage, Usage): return usage elif isinstance(usage, dict): return Usage(**usage) raise ValueError(f"usage is required, got={usage} of type {type(usage)}") @staticmethod def get_model_cost_information( base_model: Optional[str], custom_pricing: Optional[bool], custom_llm_provider: Optional[str], init_response_obj: Union[Any, BaseModel, dict], ) -> StandardLoggingModelInformation: model_cost_name = _select_model_name_for_cost_calc( model=None, completion_response=init_response_obj, # type: ignore base_model=base_model, custom_pricing=custom_pricing, ) if model_cost_name is None: model_cost_information = StandardLoggingModelInformation( model_map_key="", model_map_value=None ) else: try: _model_cost_information = litellm.get_model_info( model=model_cost_name, custom_llm_provider=custom_llm_provider ) model_cost_information = StandardLoggingModelInformation( model_map_key=model_cost_name, model_map_value=_model_cost_information, ) except Exception: verbose_logger.debug( # keep in debug otherwise it will trigger on every call "Model={} is not mapped in model cost map. Defaulting to None model_cost_information for standard_logging_payload".format( model_cost_name ) ) model_cost_information = StandardLoggingModelInformation( model_map_key=model_cost_name, model_map_value=None ) return model_cost_information @staticmethod def get_final_response_obj( response_obj: dict, init_response_obj: Union[Any, BaseModel, dict], kwargs: dict ) -> Optional[Union[dict, str, list]]: """ Get final response object after redacting the message input/output from logging """ if response_obj is not None: final_response_obj: Optional[Union[dict, str, list]] = response_obj elif isinstance(init_response_obj, list) or isinstance(init_response_obj, str): final_response_obj = init_response_obj else: final_response_obj = None modified_final_response_obj = redact_message_input_output_from_logging( model_call_details=kwargs, result=final_response_obj, ) if modified_final_response_obj is not None and isinstance( modified_final_response_obj, BaseModel ): final_response_obj = modified_final_response_obj.model_dump() else: final_response_obj = modified_final_response_obj return final_response_obj @staticmethod def get_additional_headers( additiona_headers: Optional[dict], ) -> Optional[StandardLoggingAdditionalHeaders]: if additiona_headers is None: return None additional_logging_headers: StandardLoggingAdditionalHeaders = {} for key in StandardLoggingAdditionalHeaders.__annotations__.keys(): _key = key.lower() _key = _key.replace("_", "-") if _key in additiona_headers: try: additional_logging_headers[key] = int(additiona_headers[_key]) # type: ignore except (ValueError, TypeError): verbose_logger.debug( f"Could not convert {additiona_headers[_key]} to int for key {key}." ) return additional_logging_headers @staticmethod def get_hidden_params( hidden_params: Optional[dict], ) -> StandardLoggingHiddenParams: clean_hidden_params = StandardLoggingHiddenParams( model_id=None, cache_key=None, api_base=None, response_cost=None, additional_headers=None, litellm_overhead_time_ms=None, ) if hidden_params is not None: for key in StandardLoggingHiddenParams.__annotations__.keys(): if key in hidden_params: if key == "additional_headers": clean_hidden_params["additional_headers"] = ( StandardLoggingPayloadSetup.get_additional_headers( hidden_params[key] ) ) else: clean_hidden_params[key] = hidden_params[key] # type: ignore return clean_hidden_params @staticmethod def strip_trailing_slash(api_base: Optional[str]) -> Optional[str]: if api_base: return api_base.rstrip("/") return api_base @staticmethod def get_error_information( original_exception: Optional[Exception], ) -> StandardLoggingPayloadErrorInformation: error_status: str = str(getattr(original_exception, "status_code", "")) error_class: str = ( str(original_exception.__class__.__name__) if original_exception else "" ) _llm_provider_in_exception = getattr(original_exception, "llm_provider", "") return StandardLoggingPayloadErrorInformation( error_code=error_status, error_class=error_class, llm_provider=_llm_provider_in_exception, ) @staticmethod def get_response_time( start_time_float: float, end_time_float: float, completion_start_time_float: float, stream: bool, ) -> float: """ Get the response time for the LLM response Args: start_time_float: float - start time of the LLM call end_time_float: float - end time of the LLM call completion_start_time_float: float - time to first token of the LLM response (for streaming responses) stream: bool - True when a stream response is returned Returns: float: The response time for the LLM response """ if stream is True: return completion_start_time_float - start_time_float else: return end_time_float - start_time_float def get_standard_logging_object_payload( kwargs: Optional[dict], init_response_obj: Union[Any, BaseModel, dict], start_time: dt_object, end_time: dt_object, logging_obj: Logging, status: StandardLoggingPayloadStatus, error_str: Optional[str] = None, original_exception: Optional[Exception] = None, ) -> Optional[StandardLoggingPayload]: try: kwargs = kwargs or {} hidden_params: Optional[dict] = None if init_response_obj is None: response_obj = {} elif isinstance(init_response_obj, BaseModel): response_obj = init_response_obj.model_dump() hidden_params = getattr(init_response_obj, "_hidden_params", None) elif isinstance(init_response_obj, dict): response_obj = init_response_obj else: response_obj = {} if original_exception is not None and hidden_params is None: response_headers = _get_response_headers(original_exception) if response_headers is not None: hidden_params = dict( StandardLoggingHiddenParams( additional_headers=StandardLoggingPayloadSetup.get_additional_headers( dict(response_headers) ), model_id=None, cache_key=None, api_base=None, response_cost=None, litellm_overhead_time_ms=None, ) ) # standardize this function to be used across, s3, dynamoDB, langfuse logging litellm_params = kwargs.get("litellm_params", {}) proxy_server_request = litellm_params.get("proxy_server_request") or {} metadata: dict = ( litellm_params.get("litellm_metadata") or litellm_params.get("metadata", None) or {} ) completion_start_time = kwargs.get("completion_start_time", end_time) call_type = kwargs.get("call_type") cache_hit = kwargs.get("cache_hit", False) usage = StandardLoggingPayloadSetup.get_usage_from_response_obj( response_obj=response_obj ) id = response_obj.get("id", kwargs.get("litellm_call_id")) _model_id = metadata.get("model_info", {}).get("id", "") _model_group = metadata.get("model_group", "") request_tags = ( metadata.get("tags", []) if isinstance(metadata.get("tags", []), list) else [] ) # cleanup timestamps start_time_float, end_time_float, completion_start_time_float = ( StandardLoggingPayloadSetup.cleanup_timestamps( start_time=start_time, end_time=end_time, completion_start_time=completion_start_time, ) ) response_time = StandardLoggingPayloadSetup.get_response_time( start_time_float=start_time_float, end_time_float=end_time_float, completion_start_time_float=completion_start_time_float, stream=kwargs.get("stream", False), ) # clean up litellm hidden params clean_hidden_params = StandardLoggingPayloadSetup.get_hidden_params( hidden_params ) # clean up litellm metadata clean_metadata = StandardLoggingPayloadSetup.get_standard_logging_metadata( metadata=metadata, litellm_params=litellm_params, prompt_integration=kwargs.get("prompt_integration", None), ) _request_body = proxy_server_request.get("body", {}) end_user_id = clean_metadata["user_api_key_end_user_id"] or _request_body.get( "user", None ) # maintain backwards compatibility with old request body check saved_cache_cost: float = 0.0 if cache_hit is True: id = f"{id}_cache_hit{time.time()}" # do not duplicate the request id saved_cache_cost = ( logging_obj._response_cost_calculator( result=init_response_obj, cache_hit=False # type: ignore ) or 0.0 ) ## Get model cost information ## base_model = _get_base_model_from_metadata(model_call_details=kwargs) custom_pricing = use_custom_pricing_for_model(litellm_params=litellm_params) model_cost_information = StandardLoggingPayloadSetup.get_model_cost_information( base_model=base_model, custom_pricing=custom_pricing, custom_llm_provider=kwargs.get("custom_llm_provider"), init_response_obj=init_response_obj, ) response_cost: float = kwargs.get("response_cost", 0) or 0.0 error_information = StandardLoggingPayloadSetup.get_error_information( original_exception=original_exception, ) ## get final response object ## final_response_obj = StandardLoggingPayloadSetup.get_final_response_obj( response_obj=response_obj, init_response_obj=init_response_obj, kwargs=kwargs, ) stream: Optional[bool] = None if ( kwargs.get("complete_streaming_response") is not None or kwargs.get("async_complete_streaming_response") is not None ): stream = True payload: StandardLoggingPayload = StandardLoggingPayload( id=str(id), trace_id=kwargs.get("litellm_trace_id"), # type: ignore call_type=call_type or "", cache_hit=cache_hit, stream=stream, status=status, custom_llm_provider=cast(Optional[str], kwargs.get("custom_llm_provider")), saved_cache_cost=saved_cache_cost, startTime=start_time_float, endTime=end_time_float, completionStartTime=completion_start_time_float, response_time=response_time, model=kwargs.get("model", "") or "", metadata=clean_metadata, cache_key=clean_hidden_params["cache_key"], response_cost=response_cost, total_tokens=usage.total_tokens, prompt_tokens=usage.prompt_tokens, completion_tokens=usage.completion_tokens, request_tags=request_tags, end_user=end_user_id or "", api_base=StandardLoggingPayloadSetup.strip_trailing_slash( litellm_params.get("api_base", "") ) or "", model_group=_model_group, model_id=_model_id, requester_ip_address=clean_metadata.get("requester_ip_address", None), messages=kwargs.get("messages"), response=final_response_obj, model_parameters=kwargs.get("optional_params", None), hidden_params=clean_hidden_params, model_map_information=model_cost_information, error_str=error_str, error_information=error_information, response_cost_failure_debug_info=kwargs.get( "response_cost_failure_debug_information" ), guardrail_information=metadata.get( "standard_logging_guardrail_information", None ), ) emit_standard_logging_payload(payload) return payload except Exception as e: verbose_logger.exception( "Error creating standard logging object - {}".format(str(e)) ) return None def emit_standard_logging_payload(payload: StandardLoggingPayload): if os.getenv("LITELLM_PRINT_STANDARD_LOGGING_PAYLOAD"): verbose_logger.info(json.dumps(payload, indent=4)) def get_standard_logging_metadata( metadata: Optional[Dict[str, Any]] ) -> StandardLoggingMetadata: """ Clean and filter the metadata dictionary to include only the specified keys in StandardLoggingMetadata. Args: metadata (Optional[Dict[str, Any]]): The original metadata dictionary. Returns: StandardLoggingMetadata: A StandardLoggingMetadata object containing the cleaned metadata. Note: - If the input metadata is None or not a dictionary, an empty StandardLoggingMetadata object is returned. - If 'user_api_key' is present in metadata and is a valid SHA256 hash, it's stored as 'user_api_key_hash'. """ # Initialize with default values clean_metadata = StandardLoggingMetadata( user_api_key_hash=None, user_api_key_alias=None, user_api_key_team_id=None, user_api_key_org_id=None, user_api_key_user_id=None, user_api_key_team_alias=None, spend_logs_metadata=None, requester_ip_address=None, requester_metadata=None, user_api_key_end_user_id=None, prompt_management_metadata=None, ) if isinstance(metadata, dict): # Filter the metadata dictionary to include only the specified keys clean_metadata = StandardLoggingMetadata( **{ # type: ignore key: metadata[key] for key in StandardLoggingMetadata.__annotations__.keys() if key in metadata } ) if metadata.get("user_api_key") is not None: if is_valid_sha256_hash(str(metadata.get("user_api_key"))): clean_metadata["user_api_key_hash"] = metadata.get( "user_api_key" ) # this is the hash return clean_metadata def scrub_sensitive_keys_in_metadata(litellm_params: Optional[dict]): if litellm_params is None: litellm_params = {} metadata = litellm_params.get("metadata", {}) or {} ## check user_api_key_metadata for sensitive logging keys cleaned_user_api_key_metadata = {} if "user_api_key_metadata" in metadata and isinstance( metadata["user_api_key_metadata"], dict ): for k, v in metadata["user_api_key_metadata"].items(): if k == "logging": # prevent logging user logging keys cleaned_user_api_key_metadata[k] = ( "scrubbed_by_litellm_for_sensitive_keys" ) else: cleaned_user_api_key_metadata[k] = v metadata["user_api_key_metadata"] = cleaned_user_api_key_metadata litellm_params["metadata"] = metadata return litellm_params # integration helper function def modify_integration(integration_name, integration_params): global supabaseClient if integration_name == "supabase": if "table_name" in integration_params: Supabase.supabase_table_name = integration_params["table_name"] @lru_cache(maxsize=16) def _get_traceback_str_for_error(error_str: str) -> str: """ function wrapped with lru_cache to limit the number of times `traceback.format_exc()` is called """ return traceback.format_exc() from decimal import Decimal # used for unit testing from typing import Any, Dict, List, Optional, Union def create_dummy_standard_logging_payload() -> StandardLoggingPayload: # First create the nested objects with proper typing model_info = StandardLoggingModelInformation( model_map_key="gpt-3.5-turbo", model_map_value=None ) metadata = StandardLoggingMetadata( # type: ignore user_api_key_hash=str("test_hash"), user_api_key_alias=str("test_alias"), user_api_key_team_id=str("test_team"), user_api_key_user_id=str("test_user"), user_api_key_team_alias=str("test_team_alias"), user_api_key_org_id=None, spend_logs_metadata=None, requester_ip_address=str("127.0.0.1"), requester_metadata=None, user_api_key_end_user_id=str("test_end_user"), ) hidden_params = StandardLoggingHiddenParams( model_id=None, cache_key=None, api_base=None, response_cost=None, additional_headers=None, litellm_overhead_time_ms=None, ) # Convert numeric values to appropriate types response_cost = Decimal("0.1") start_time = Decimal("1234567890.0") end_time = Decimal("1234567891.0") completion_start_time = Decimal("1234567890.5") saved_cache_cost = Decimal("0.0") # Create messages and response with proper typing messages: List[Dict[str, str]] = [{"role": "user", "content": "Hello, world!"}] response: Dict[str, List[Dict[str, Dict[str, str]]]] = { "choices": [{"message": {"content": "Hi there!"}}] } # Main payload initialization return StandardLoggingPayload( # type: ignore id=str("test_id"), call_type=str("completion"), stream=bool(False), response_cost=response_cost, response_cost_failure_debug_info=None, status=str("success"), total_tokens=int(30), prompt_tokens=int(20), completion_tokens=int(10), startTime=start_time, endTime=end_time, completionStartTime=completion_start_time, model_map_information=model_info, model=str("gpt-3.5-turbo"), model_id=str("model-123"), model_group=str("openai-gpt"), custom_llm_provider=str("openai"), api_base=str("https://api.openai.com"), metadata=metadata, cache_hit=bool(False), cache_key=None, saved_cache_cost=saved_cache_cost, request_tags=[], end_user=None, requester_ip_address=str("127.0.0.1"), messages=messages, response=response, error_str=None, model_parameters={"stream": True}, hidden_params=hidden_params, )