File size: 7,749 Bytes
e3278e4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
"""
Implements logging integration with Datadog's LLM Observability Service
API Reference: https://docs.datadoghq.com/llm_observability/setup/api/?tab=example#api-standards
"""
import asyncio
import json
import os
import uuid
from datetime import datetime
from typing import Any, Dict, List, Optional, Union
import litellm
from litellm._logging import verbose_logger
from litellm.integrations.custom_batch_logger import CustomBatchLogger
from litellm.integrations.datadog.datadog import DataDogLogger
from litellm.llms.custom_httpx.http_handler import (
get_async_httpx_client,
httpxSpecialProvider,
)
from litellm.types.integrations.datadog_llm_obs import *
from litellm.types.utils import StandardLoggingPayload
class DataDogLLMObsLogger(DataDogLogger, CustomBatchLogger):
def __init__(self, **kwargs):
try:
verbose_logger.debug("DataDogLLMObs: Initializing logger")
if os.getenv("DD_API_KEY", None) is None:
raise Exception("DD_API_KEY is not set, set 'DD_API_KEY=<>'")
if os.getenv("DD_SITE", None) is None:
raise Exception(
"DD_SITE is not set, set 'DD_SITE=<>', example sit = `us5.datadoghq.com`"
)
self.async_client = get_async_httpx_client(
llm_provider=httpxSpecialProvider.LoggingCallback
)
self.DD_API_KEY = os.getenv("DD_API_KEY")
self.DD_SITE = os.getenv("DD_SITE")
self.intake_url = (
f"https://api.{self.DD_SITE}/api/intake/llm-obs/v1/trace/spans"
)
# testing base url
dd_base_url = os.getenv("DD_BASE_URL")
if dd_base_url:
self.intake_url = f"{dd_base_url}/api/intake/llm-obs/v1/trace/spans"
asyncio.create_task(self.periodic_flush())
self.flush_lock = asyncio.Lock()
self.log_queue: List[LLMObsPayload] = []
CustomBatchLogger.__init__(self, **kwargs, flush_lock=self.flush_lock)
except Exception as e:
verbose_logger.exception(f"DataDogLLMObs: Error initializing - {str(e)}")
raise e
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
try:
verbose_logger.debug(
f"DataDogLLMObs: Logging success event for model {kwargs.get('model', 'unknown')}"
)
payload = self.create_llm_obs_payload(
kwargs, response_obj, start_time, end_time
)
verbose_logger.debug(f"DataDogLLMObs: Payload: {payload}")
self.log_queue.append(payload)
if len(self.log_queue) >= self.batch_size:
await self.async_send_batch()
except Exception as e:
verbose_logger.exception(
f"DataDogLLMObs: Error logging success event - {str(e)}"
)
async def async_send_batch(self):
try:
if not self.log_queue:
return
verbose_logger.debug(
f"DataDogLLMObs: Flushing {len(self.log_queue)} events"
)
# Prepare the payload
payload = {
"data": DDIntakePayload(
type="span",
attributes=DDSpanAttributes(
ml_app=self._get_datadog_service(),
tags=[self._get_datadog_tags()],
spans=self.log_queue,
),
),
}
verbose_logger.debug("payload %s", json.dumps(payload, indent=4))
response = await self.async_client.post(
url=self.intake_url,
json=payload,
headers={
"DD-API-KEY": self.DD_API_KEY,
"Content-Type": "application/json",
},
)
response.raise_for_status()
if response.status_code != 202:
raise Exception(
f"DataDogLLMObs: Unexpected response - status_code: {response.status_code}, text: {response.text}"
)
verbose_logger.debug(
f"DataDogLLMObs: Successfully sent batch - status_code: {response.status_code}"
)
self.log_queue.clear()
except Exception as e:
verbose_logger.exception(f"DataDogLLMObs: Error sending batch - {str(e)}")
def create_llm_obs_payload(
self, kwargs: Dict, response_obj: Any, start_time: datetime, end_time: datetime
) -> LLMObsPayload:
standard_logging_payload: Optional[StandardLoggingPayload] = kwargs.get(
"standard_logging_object"
)
if standard_logging_payload is None:
raise Exception("DataDogLLMObs: standard_logging_object is not set")
messages = standard_logging_payload["messages"]
messages = self._ensure_string_content(messages=messages)
metadata = kwargs.get("litellm_params", {}).get("metadata", {})
input_meta = InputMeta(messages=messages) # type: ignore
output_meta = OutputMeta(messages=self._get_response_messages(response_obj))
meta = Meta(
kind="llm",
input=input_meta,
output=output_meta,
metadata=self._get_dd_llm_obs_payload_metadata(standard_logging_payload),
)
# Calculate metrics (you may need to adjust these based on available data)
metrics = LLMMetrics(
input_tokens=float(standard_logging_payload.get("prompt_tokens", 0)),
output_tokens=float(standard_logging_payload.get("completion_tokens", 0)),
total_tokens=float(standard_logging_payload.get("total_tokens", 0)),
)
return LLMObsPayload(
parent_id=metadata.get("parent_id", "undefined"),
trace_id=metadata.get("trace_id", str(uuid.uuid4())),
span_id=metadata.get("span_id", str(uuid.uuid4())),
name=metadata.get("name", "litellm_llm_call"),
meta=meta,
start_ns=int(start_time.timestamp() * 1e9),
duration=int((end_time - start_time).total_seconds() * 1e9),
metrics=metrics,
tags=[
self._get_datadog_tags(standard_logging_object=standard_logging_payload)
],
)
def _get_response_messages(self, response_obj: Any) -> List[Any]:
"""
Get the messages from the response object
for now this handles logging /chat/completions responses
"""
if isinstance(response_obj, litellm.ModelResponse):
return [response_obj["choices"][0]["message"].json()]
return []
def _ensure_string_content(
self, messages: Optional[Union[str, List[Any], Dict[Any, Any]]]
) -> List[Any]:
if messages is None:
return []
if isinstance(messages, str):
return [messages]
elif isinstance(messages, list):
return [message for message in messages]
elif isinstance(messages, dict):
return [str(messages.get("content", ""))]
return []
def _get_dd_llm_obs_payload_metadata(
self, standard_logging_payload: StandardLoggingPayload
) -> Dict:
_metadata = {
"model_name": standard_logging_payload.get("model", "unknown"),
"model_provider": standard_logging_payload.get(
"custom_llm_provider", "unknown"
),
}
_standard_logging_metadata: dict = (
dict(standard_logging_payload.get("metadata", {})) or {}
)
_metadata.update(_standard_logging_metadata)
return _metadata
|