File size: 7,749 Bytes
e3278e4 |
|
"""
Implements logging integration with Datadog's LLM Observability Service
API Reference: https://docs.datadoghq.com/llm_observability/setup/api/?tab=example#api-standards
"""
import asyncio
import json
import os
import uuid
from datetime import datetime
from typing import Any, Dict, List, Optional, Union
import litellm
from litellm._logging import verbose_logger
from litellm.integrations.custom_batch_logger import CustomBatchLogger
from litellm.integrations.datadog.datadog import DataDogLogger
from litellm.llms.custom_httpx.http_handler import (
get_async_httpx_client,
httpxSpecialProvider,
)
from litellm.types.integrations.datadog_llm_obs import *
from litellm.types.utils import StandardLoggingPayload
class DataDogLLMObsLogger(DataDogLogger, CustomBatchLogger):
def __init__(self, **kwargs):
try:
verbose_logger.debug("DataDogLLMObs: Initializing logger")
if os.getenv("DD_API_KEY", None) is None:
raise Exception("DD_API_KEY is not set, set 'DD_API_KEY=<>'")
if os.getenv("DD_SITE", None) is None:
raise Exception(
"DD_SITE is not set, set 'DD_SITE=<>', example sit = `us5.datadoghq.com`"
)
self.async_client = get_async_httpx_client(
llm_provider=httpxSpecialProvider.LoggingCallback
)
self.DD_API_KEY = os.getenv("DD_API_KEY")
self.DD_SITE = os.getenv("DD_SITE")
self.intake_url = (
f"https://api.{self.DD_SITE}/api/intake/llm-obs/v1/trace/spans"
)
# testing base url
dd_base_url = os.getenv("DD_BASE_URL")
if dd_base_url:
self.intake_url = f"{dd_base_url}/api/intake/llm-obs/v1/trace/spans"
asyncio.create_task(self.periodic_flush())
self.flush_lock = asyncio.Lock()
self.log_queue: List[LLMObsPayload] = []
CustomBatchLogger.__init__(self, **kwargs, flush_lock=self.flush_lock)
except Exception as e:
verbose_logger.exception(f"DataDogLLMObs: Error initializing - {str(e)}")
raise e
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
try:
verbose_logger.debug(
f"DataDogLLMObs: Logging success event for model {kwargs.get('model', 'unknown')}"
)
payload = self.create_llm_obs_payload(
kwargs, response_obj, start_time, end_time
)
verbose_logger.debug(f"DataDogLLMObs: Payload: {payload}")
self.log_queue.append(payload)
if len(self.log_queue) >= self.batch_size:
await self.async_send_batch()
except Exception as e:
verbose_logger.exception(
f"DataDogLLMObs: Error logging success event - {str(e)}"
)
async def async_send_batch(self):
try:
if not self.log_queue:
return
verbose_logger.debug(
f"DataDogLLMObs: Flushing {len(self.log_queue)} events"
)
# Prepare the payload
payload = {
"data": DDIntakePayload(
type="span",
attributes=DDSpanAttributes(
ml_app=self._get_datadog_service(),
tags=[self._get_datadog_tags()],
spans=self.log_queue,
),
),
}
verbose_logger.debug("payload %s", json.dumps(payload, indent=4))
response = await self.async_client.post(
url=self.intake_url,
json=payload,
headers={
"DD-API-KEY": self.DD_API_KEY,
"Content-Type": "application/json",
},
)
response.raise_for_status()
if response.status_code != 202:
raise Exception(
f"DataDogLLMObs: Unexpected response - status_code: {response.status_code}, text: {response.text}"
)
verbose_logger.debug(
f"DataDogLLMObs: Successfully sent batch - status_code: {response.status_code}"
)
self.log_queue.clear()
except Exception as e:
verbose_logger.exception(f"DataDogLLMObs: Error sending batch - {str(e)}")
def create_llm_obs_payload(
self, kwargs: Dict, response_obj: Any, start_time: datetime, end_time: datetime
) -> LLMObsPayload:
standard_logging_payload: Optional[StandardLoggingPayload] = kwargs.get(
"standard_logging_object"
)
if standard_logging_payload is None:
raise Exception("DataDogLLMObs: standard_logging_object is not set")
messages = standard_logging_payload["messages"]
messages = self._ensure_string_content(messages=messages)
metadata = kwargs.get("litellm_params", {}).get("metadata", {})
input_meta = InputMeta(messages=messages) # type: ignore
output_meta = OutputMeta(messages=self._get_response_messages(response_obj))
meta = Meta(
kind="llm",
input=input_meta,
output=output_meta,
metadata=self._get_dd_llm_obs_payload_metadata(standard_logging_payload),
)
# Calculate metrics (you may need to adjust these based on available data)
metrics = LLMMetrics(
input_tokens=float(standard_logging_payload.get("prompt_tokens", 0)),
output_tokens=float(standard_logging_payload.get("completion_tokens", 0)),
total_tokens=float(standard_logging_payload.get("total_tokens", 0)),
)
return LLMObsPayload(
parent_id=metadata.get("parent_id", "undefined"),
trace_id=metadata.get("trace_id", str(uuid.uuid4())),
span_id=metadata.get("span_id", str(uuid.uuid4())),
name=metadata.get("name", "litellm_llm_call"),
meta=meta,
start_ns=int(start_time.timestamp() * 1e9),
duration=int((end_time - start_time).total_seconds() * 1e9),
metrics=metrics,
tags=[
self._get_datadog_tags(standard_logging_object=standard_logging_payload)
],
)
def _get_response_messages(self, response_obj: Any) -> List[Any]:
"""
Get the messages from the response object
for now this handles logging /chat/completions responses
"""
if isinstance(response_obj, litellm.ModelResponse):
return [response_obj["choices"][0]["message"].json()]
return []
def _ensure_string_content(
self, messages: Optional[Union[str, List[Any], Dict[Any, Any]]]
) -> List[Any]:
if messages is None:
return []
if isinstance(messages, str):
return [messages]
elif isinstance(messages, list):
return [message for message in messages]
elif isinstance(messages, dict):
return [str(messages.get("content", ""))]
return []
def _get_dd_llm_obs_payload_metadata(
self, standard_logging_payload: StandardLoggingPayload
) -> Dict:
_metadata = {
"model_name": standard_logging_payload.get("model", "unknown"),
"model_provider": standard_logging_payload.get(
"custom_llm_provider", "unknown"
),
}
_standard_logging_metadata: dict = (
dict(standard_logging_payload.get("metadata", {})) or {}
)
_metadata.update(_standard_logging_metadata)
return _metadata
|