""" Implements logging integration with Datadog's LLM Observability Service API Reference: https://docs.datadoghq.com/llm_observability/setup/api/?tab=example#api-standards """ import asyncio import json import os import uuid from datetime import datetime from typing import Any, Dict, List, Optional, Union import litellm from litellm._logging import verbose_logger from litellm.integrations.custom_batch_logger import CustomBatchLogger from litellm.integrations.datadog.datadog import DataDogLogger from litellm.llms.custom_httpx.http_handler import ( get_async_httpx_client, httpxSpecialProvider, ) from litellm.types.integrations.datadog_llm_obs import * from litellm.types.utils import StandardLoggingPayload class DataDogLLMObsLogger(DataDogLogger, CustomBatchLogger): def __init__(self, **kwargs): try: verbose_logger.debug("DataDogLLMObs: Initializing logger") if os.getenv("DD_API_KEY", None) is None: raise Exception("DD_API_KEY is not set, set 'DD_API_KEY=<>'") if os.getenv("DD_SITE", None) is None: raise Exception( "DD_SITE is not set, set 'DD_SITE=<>', example sit = `us5.datadoghq.com`" ) self.async_client = get_async_httpx_client( llm_provider=httpxSpecialProvider.LoggingCallback ) self.DD_API_KEY = os.getenv("DD_API_KEY") self.DD_SITE = os.getenv("DD_SITE") self.intake_url = ( f"https://api.{self.DD_SITE}/api/intake/llm-obs/v1/trace/spans" ) # testing base url dd_base_url = os.getenv("DD_BASE_URL") if dd_base_url: self.intake_url = f"{dd_base_url}/api/intake/llm-obs/v1/trace/spans" asyncio.create_task(self.periodic_flush()) self.flush_lock = asyncio.Lock() self.log_queue: List[LLMObsPayload] = [] CustomBatchLogger.__init__(self, **kwargs, flush_lock=self.flush_lock) except Exception as e: verbose_logger.exception(f"DataDogLLMObs: Error initializing - {str(e)}") raise e async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): try: verbose_logger.debug( f"DataDogLLMObs: Logging success event for model {kwargs.get('model', 'unknown')}" ) payload = self.create_llm_obs_payload( kwargs, response_obj, start_time, end_time ) verbose_logger.debug(f"DataDogLLMObs: Payload: {payload}") self.log_queue.append(payload) if len(self.log_queue) >= self.batch_size: await self.async_send_batch() except Exception as e: verbose_logger.exception( f"DataDogLLMObs: Error logging success event - {str(e)}" ) async def async_send_batch(self): try: if not self.log_queue: return verbose_logger.debug( f"DataDogLLMObs: Flushing {len(self.log_queue)} events" ) # Prepare the payload payload = { "data": DDIntakePayload( type="span", attributes=DDSpanAttributes( ml_app=self._get_datadog_service(), tags=[self._get_datadog_tags()], spans=self.log_queue, ), ), } verbose_logger.debug("payload %s", json.dumps(payload, indent=4)) response = await self.async_client.post( url=self.intake_url, json=payload, headers={ "DD-API-KEY": self.DD_API_KEY, "Content-Type": "application/json", }, ) response.raise_for_status() if response.status_code != 202: raise Exception( f"DataDogLLMObs: Unexpected response - status_code: {response.status_code}, text: {response.text}" ) verbose_logger.debug( f"DataDogLLMObs: Successfully sent batch - status_code: {response.status_code}" ) self.log_queue.clear() except Exception as e: verbose_logger.exception(f"DataDogLLMObs: Error sending batch - {str(e)}") def create_llm_obs_payload( self, kwargs: Dict, response_obj: Any, start_time: datetime, end_time: datetime ) -> LLMObsPayload: standard_logging_payload: Optional[StandardLoggingPayload] = kwargs.get( "standard_logging_object" ) if standard_logging_payload is None: raise Exception("DataDogLLMObs: standard_logging_object is not set") messages = standard_logging_payload["messages"] messages = self._ensure_string_content(messages=messages) metadata = kwargs.get("litellm_params", {}).get("metadata", {}) input_meta = InputMeta(messages=messages) # type: ignore output_meta = OutputMeta(messages=self._get_response_messages(response_obj)) meta = Meta( kind="llm", input=input_meta, output=output_meta, metadata=self._get_dd_llm_obs_payload_metadata(standard_logging_payload), ) # Calculate metrics (you may need to adjust these based on available data) metrics = LLMMetrics( input_tokens=float(standard_logging_payload.get("prompt_tokens", 0)), output_tokens=float(standard_logging_payload.get("completion_tokens", 0)), total_tokens=float(standard_logging_payload.get("total_tokens", 0)), ) return LLMObsPayload( parent_id=metadata.get("parent_id", "undefined"), trace_id=metadata.get("trace_id", str(uuid.uuid4())), span_id=metadata.get("span_id", str(uuid.uuid4())), name=metadata.get("name", "litellm_llm_call"), meta=meta, start_ns=int(start_time.timestamp() * 1e9), duration=int((end_time - start_time).total_seconds() * 1e9), metrics=metrics, tags=[ self._get_datadog_tags(standard_logging_object=standard_logging_payload) ], ) def _get_response_messages(self, response_obj: Any) -> List[Any]: """ Get the messages from the response object for now this handles logging /chat/completions responses """ if isinstance(response_obj, litellm.ModelResponse): return [response_obj["choices"][0]["message"].json()] return [] def _ensure_string_content( self, messages: Optional[Union[str, List[Any], Dict[Any, Any]]] ) -> List[Any]: if messages is None: return [] if isinstance(messages, str): return [messages] elif isinstance(messages, list): return [message for message in messages] elif isinstance(messages, dict): return [str(messages.get("content", ""))] return [] def _get_dd_llm_obs_payload_metadata( self, standard_logging_payload: StandardLoggingPayload ) -> Dict: _metadata = { "model_name": standard_logging_payload.get("model", "unknown"), "model_provider": standard_logging_payload.get( "custom_llm_provider", "unknown" ), } _standard_logging_metadata: dict = ( dict(standard_logging_payload.get("metadata", {})) or {} ) _metadata.update(_standard_logging_metadata) return _metadata