|
""" |
|
DataDog Integration - sends logs to /api/v2/log |
|
|
|
DD Reference API: https://docs.datadoghq.com/api/latest/logs |
|
|
|
`async_log_success_event` - used by litellm proxy to send logs to datadog |
|
`log_success_event` - sync version of logging to DataDog, only used on litellm Python SDK, if user opts in to using sync functions |
|
|
|
async_log_success_event: will store batch of DD_MAX_BATCH_SIZE in memory and flush to Datadog once it reaches DD_MAX_BATCH_SIZE or every 5 seconds |
|
|
|
async_service_failure_hook: Logs failures from Redis, Postgres (Adjacent systems), as 'WARNING' on DataDog |
|
|
|
For batching specific details see CustomBatchLogger class |
|
""" |
|
|
|
import asyncio |
|
import datetime |
|
import json |
|
import os |
|
import traceback |
|
import uuid |
|
from datetime import datetime as datetimeObj |
|
from typing import Any, List, Optional, Union |
|
|
|
import httpx |
|
from httpx import Response |
|
|
|
import litellm |
|
from litellm._logging import verbose_logger |
|
from litellm.integrations.custom_batch_logger import CustomBatchLogger |
|
from litellm.llms.custom_httpx.http_handler import ( |
|
_get_httpx_client, |
|
get_async_httpx_client, |
|
httpxSpecialProvider, |
|
) |
|
from litellm.types.integrations.base_health_check import IntegrationHealthCheckStatus |
|
from litellm.types.integrations.datadog import * |
|
from litellm.types.services import ServiceLoggerPayload |
|
from litellm.types.utils import StandardLoggingPayload |
|
|
|
from ..base_health_check import HealthCheckIntegration |
|
|
|
DD_MAX_BATCH_SIZE = 1000 |
|
|
|
|
|
class DataDogLogger( |
|
CustomBatchLogger, |
|
HealthCheckIntegration, |
|
): |
|
|
|
def __init__( |
|
self, |
|
**kwargs, |
|
): |
|
""" |
|
Initializes the datadog logger, checks if the correct env variables are set |
|
|
|
Required environment variables: |
|
`DD_API_KEY` - your datadog api key |
|
`DD_SITE` - your datadog site, example = `"us5.datadoghq.com"` |
|
""" |
|
try: |
|
verbose_logger.debug("Datadog: in init datadog logger") |
|
|
|
if os.getenv("DD_API_KEY", None) is None: |
|
raise Exception("DD_API_KEY is not set, set 'DD_API_KEY=<>") |
|
if os.getenv("DD_SITE", None) is None: |
|
raise Exception("DD_SITE is not set in .env, set 'DD_SITE=<>") |
|
self.async_client = get_async_httpx_client( |
|
llm_provider=httpxSpecialProvider.LoggingCallback |
|
) |
|
self.DD_API_KEY = os.getenv("DD_API_KEY") |
|
self.intake_url = ( |
|
f"https://http-intake.logs.{os.getenv('DD_SITE')}/api/v2/logs" |
|
) |
|
|
|
|
|
|
|
dd_base_url: Optional[str] = ( |
|
os.getenv("_DATADOG_BASE_URL") |
|
or os.getenv("DATADOG_BASE_URL") |
|
or os.getenv("DD_BASE_URL") |
|
) |
|
if dd_base_url is not None: |
|
self.intake_url = f"{dd_base_url}/api/v2/logs" |
|
|
|
self.sync_client = _get_httpx_client() |
|
asyncio.create_task(self.periodic_flush()) |
|
self.flush_lock = asyncio.Lock() |
|
super().__init__( |
|
**kwargs, flush_lock=self.flush_lock, batch_size=DD_MAX_BATCH_SIZE |
|
) |
|
except Exception as e: |
|
verbose_logger.exception( |
|
f"Datadog: Got exception on init Datadog client {str(e)}" |
|
) |
|
raise e |
|
|
|
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): |
|
""" |
|
Async Log success events to Datadog |
|
|
|
- Creates a Datadog payload |
|
- Adds the Payload to the in memory logs queue |
|
- Payload is flushed every 10 seconds or when batch size is greater than 100 |
|
|
|
|
|
Raises: |
|
Raises a NON Blocking verbose_logger.exception if an error occurs |
|
""" |
|
try: |
|
verbose_logger.debug( |
|
"Datadog: Logging - Enters logging function for model %s", kwargs |
|
) |
|
await self._log_async_event(kwargs, response_obj, start_time, end_time) |
|
|
|
except Exception as e: |
|
verbose_logger.exception( |
|
f"Datadog Layer Error - {str(e)}\n{traceback.format_exc()}" |
|
) |
|
pass |
|
|
|
async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time): |
|
try: |
|
verbose_logger.debug( |
|
"Datadog: Logging - Enters logging function for model %s", kwargs |
|
) |
|
await self._log_async_event(kwargs, response_obj, start_time, end_time) |
|
|
|
except Exception as e: |
|
verbose_logger.exception( |
|
f"Datadog Layer Error - {str(e)}\n{traceback.format_exc()}" |
|
) |
|
pass |
|
|
|
async def async_send_batch(self): |
|
""" |
|
Sends the in memory logs queue to datadog api |
|
|
|
Logs sent to /api/v2/logs |
|
|
|
DD Ref: https://docs.datadoghq.com/api/latest/logs/ |
|
|
|
Raises: |
|
Raises a NON Blocking verbose_logger.exception if an error occurs |
|
""" |
|
try: |
|
if not self.log_queue: |
|
verbose_logger.exception("Datadog: log_queue does not exist") |
|
return |
|
|
|
verbose_logger.debug( |
|
"Datadog - about to flush %s events on %s", |
|
len(self.log_queue), |
|
self.intake_url, |
|
) |
|
|
|
response = await self.async_send_compressed_data(self.log_queue) |
|
if response.status_code == 413: |
|
verbose_logger.exception(DD_ERRORS.DATADOG_413_ERROR.value) |
|
return |
|
|
|
response.raise_for_status() |
|
if response.status_code != 202: |
|
raise Exception( |
|
f"Response from datadog API status_code: {response.status_code}, text: {response.text}" |
|
) |
|
|
|
verbose_logger.debug( |
|
"Datadog: Response from datadog API status_code: %s, text: %s", |
|
response.status_code, |
|
response.text, |
|
) |
|
except Exception as e: |
|
verbose_logger.exception( |
|
f"Datadog Error sending batch API - {str(e)}\n{traceback.format_exc()}" |
|
) |
|
|
|
def log_success_event(self, kwargs, response_obj, start_time, end_time): |
|
""" |
|
Sync Log success events to Datadog |
|
|
|
- Creates a Datadog payload |
|
- instantly logs it on DD API |
|
""" |
|
try: |
|
if litellm.datadog_use_v1 is True: |
|
dd_payload = self._create_v0_logging_payload( |
|
kwargs=kwargs, |
|
response_obj=response_obj, |
|
start_time=start_time, |
|
end_time=end_time, |
|
) |
|
else: |
|
dd_payload = self.create_datadog_logging_payload( |
|
kwargs=kwargs, |
|
response_obj=response_obj, |
|
start_time=start_time, |
|
end_time=end_time, |
|
) |
|
|
|
response = self.sync_client.post( |
|
url=self.intake_url, |
|
json=dd_payload, |
|
headers={ |
|
"DD-API-KEY": self.DD_API_KEY, |
|
}, |
|
) |
|
|
|
response.raise_for_status() |
|
if response.status_code != 202: |
|
raise Exception( |
|
f"Response from datadog API status_code: {response.status_code}, text: {response.text}" |
|
) |
|
|
|
verbose_logger.debug( |
|
"Datadog: Response from datadog API status_code: %s, text: %s", |
|
response.status_code, |
|
response.text, |
|
) |
|
|
|
except Exception as e: |
|
verbose_logger.exception( |
|
f"Datadog Layer Error - {str(e)}\n{traceback.format_exc()}" |
|
) |
|
pass |
|
pass |
|
|
|
async def _log_async_event(self, kwargs, response_obj, start_time, end_time): |
|
|
|
dd_payload = self.create_datadog_logging_payload( |
|
kwargs=kwargs, |
|
response_obj=response_obj, |
|
start_time=start_time, |
|
end_time=end_time, |
|
) |
|
|
|
self.log_queue.append(dd_payload) |
|
verbose_logger.debug( |
|
f"Datadog, event added to queue. Will flush in {self.flush_interval} seconds..." |
|
) |
|
|
|
if len(self.log_queue) >= self.batch_size: |
|
await self.async_send_batch() |
|
|
|
def _create_datadog_logging_payload_helper( |
|
self, |
|
standard_logging_object: StandardLoggingPayload, |
|
status: DataDogStatus, |
|
) -> DatadogPayload: |
|
json_payload = json.dumps(standard_logging_object, default=str) |
|
verbose_logger.debug("Datadog: Logger - Logging payload = %s", json_payload) |
|
dd_payload = DatadogPayload( |
|
ddsource=self._get_datadog_source(), |
|
ddtags=self._get_datadog_tags( |
|
standard_logging_object=standard_logging_object |
|
), |
|
hostname=self._get_datadog_hostname(), |
|
message=json_payload, |
|
service=self._get_datadog_service(), |
|
status=status, |
|
) |
|
return dd_payload |
|
|
|
def create_datadog_logging_payload( |
|
self, |
|
kwargs: Union[dict, Any], |
|
response_obj: Any, |
|
start_time: datetime.datetime, |
|
end_time: datetime.datetime, |
|
) -> DatadogPayload: |
|
""" |
|
Helper function to create a datadog payload for logging |
|
|
|
Args: |
|
kwargs (Union[dict, Any]): request kwargs |
|
response_obj (Any): llm api response |
|
start_time (datetime.datetime): start time of request |
|
end_time (datetime.datetime): end time of request |
|
|
|
Returns: |
|
DatadogPayload: defined in types.py |
|
""" |
|
|
|
standard_logging_object: Optional[StandardLoggingPayload] = kwargs.get( |
|
"standard_logging_object", None |
|
) |
|
if standard_logging_object is None: |
|
raise ValueError("standard_logging_object not found in kwargs") |
|
|
|
status = DataDogStatus.INFO |
|
if standard_logging_object.get("status") == "failure": |
|
status = DataDogStatus.ERROR |
|
|
|
|
|
self.truncate_standard_logging_payload_content(standard_logging_object) |
|
|
|
dd_payload = self._create_datadog_logging_payload_helper( |
|
standard_logging_object=standard_logging_object, |
|
status=status, |
|
) |
|
return dd_payload |
|
|
|
async def async_send_compressed_data(self, data: List) -> Response: |
|
""" |
|
Async helper to send compressed data to datadog self.intake_url |
|
|
|
Datadog recommends using gzip to compress data |
|
https://docs.datadoghq.com/api/latest/logs/ |
|
|
|
"Datadog recommends sending your logs compressed. Add the Content-Encoding: gzip header to the request when sending" |
|
""" |
|
|
|
import gzip |
|
import json |
|
|
|
compressed_data = gzip.compress(json.dumps(data, default=str).encode("utf-8")) |
|
response = await self.async_client.post( |
|
url=self.intake_url, |
|
data=compressed_data, |
|
headers={ |
|
"DD-API-KEY": self.DD_API_KEY, |
|
"Content-Encoding": "gzip", |
|
"Content-Type": "application/json", |
|
}, |
|
) |
|
return response |
|
|
|
async def async_service_failure_hook( |
|
self, |
|
payload: ServiceLoggerPayload, |
|
error: Optional[str] = "", |
|
parent_otel_span: Optional[Any] = None, |
|
start_time: Optional[Union[datetimeObj, float]] = None, |
|
end_time: Optional[Union[float, datetimeObj]] = None, |
|
event_metadata: Optional[dict] = None, |
|
): |
|
""" |
|
Logs failures from Redis, Postgres (Adjacent systems), as 'WARNING' on DataDog |
|
|
|
- example - Redis is failing / erroring, will be logged on DataDog |
|
""" |
|
|
|
try: |
|
import json |
|
|
|
_payload_dict = payload.model_dump() |
|
_dd_message_str = json.dumps(_payload_dict, default=str) |
|
_dd_payload = DatadogPayload( |
|
ddsource="litellm", |
|
ddtags="", |
|
hostname="", |
|
message=_dd_message_str, |
|
service="litellm-server", |
|
status=DataDogStatus.WARN, |
|
) |
|
|
|
self.log_queue.append(_dd_payload) |
|
|
|
except Exception as e: |
|
verbose_logger.exception( |
|
f"Datadog: Logger - Exception in async_service_failure_hook: {e}" |
|
) |
|
pass |
|
|
|
async def async_service_success_hook( |
|
self, |
|
payload: ServiceLoggerPayload, |
|
error: Optional[str] = "", |
|
parent_otel_span: Optional[Any] = None, |
|
start_time: Optional[Union[datetimeObj, float]] = None, |
|
end_time: Optional[Union[float, datetimeObj]] = None, |
|
event_metadata: Optional[dict] = None, |
|
): |
|
""" |
|
Logs success from Redis, Postgres (Adjacent systems), as 'INFO' on DataDog |
|
|
|
No user has asked for this so far, this might be spammy on datatdog. If need arises we can implement this |
|
""" |
|
return |
|
|
|
def _create_v0_logging_payload( |
|
self, |
|
kwargs: Union[dict, Any], |
|
response_obj: Any, |
|
start_time: datetime.datetime, |
|
end_time: datetime.datetime, |
|
) -> DatadogPayload: |
|
""" |
|
Note: This is our V1 Version of DataDog Logging Payload |
|
|
|
|
|
(Not Recommended) If you want this to get logged set `litellm.datadog_use_v1 = True` |
|
""" |
|
import json |
|
|
|
litellm_params = kwargs.get("litellm_params", {}) |
|
metadata = ( |
|
litellm_params.get("metadata", {}) or {} |
|
) |
|
messages = kwargs.get("messages") |
|
optional_params = kwargs.get("optional_params", {}) |
|
call_type = kwargs.get("call_type", "litellm.completion") |
|
cache_hit = kwargs.get("cache_hit", False) |
|
usage = response_obj["usage"] |
|
id = response_obj.get("id", str(uuid.uuid4())) |
|
usage = dict(usage) |
|
try: |
|
response_time = (end_time - start_time).total_seconds() * 1000 |
|
except Exception: |
|
response_time = None |
|
|
|
try: |
|
response_obj = dict(response_obj) |
|
except Exception: |
|
response_obj = response_obj |
|
|
|
|
|
|
|
|
|
clean_metadata = {} |
|
if isinstance(metadata, dict): |
|
for key, value in metadata.items(): |
|
|
|
if key in [ |
|
"endpoint", |
|
"caching_groups", |
|
"previous_models", |
|
]: |
|
continue |
|
else: |
|
clean_metadata[key] = value |
|
|
|
|
|
payload = { |
|
"id": id, |
|
"call_type": call_type, |
|
"cache_hit": cache_hit, |
|
"start_time": start_time, |
|
"end_time": end_time, |
|
"response_time": response_time, |
|
"model": kwargs.get("model", ""), |
|
"user": kwargs.get("user", ""), |
|
"model_parameters": optional_params, |
|
"spend": kwargs.get("response_cost", 0), |
|
"messages": messages, |
|
"response": response_obj, |
|
"usage": usage, |
|
"metadata": clean_metadata, |
|
} |
|
|
|
json_payload = json.dumps(payload, default=str) |
|
|
|
verbose_logger.debug("Datadog: Logger - Logging payload = %s", json_payload) |
|
|
|
dd_payload = DatadogPayload( |
|
ddsource=self._get_datadog_source(), |
|
ddtags=self._get_datadog_tags(), |
|
hostname=self._get_datadog_hostname(), |
|
message=json_payload, |
|
service=self._get_datadog_service(), |
|
status=DataDogStatus.INFO, |
|
) |
|
return dd_payload |
|
|
|
@staticmethod |
|
def _get_datadog_tags( |
|
standard_logging_object: Optional[StandardLoggingPayload] = None, |
|
) -> str: |
|
""" |
|
Get the datadog tags for the request |
|
|
|
DD tags need to be as follows: |
|
- tags: ["user_handle:[email protected]", "app_version:1.0.0"] |
|
""" |
|
base_tags = { |
|
"env": os.getenv("DD_ENV", "unknown"), |
|
"service": os.getenv("DD_SERVICE", "litellm"), |
|
"version": os.getenv("DD_VERSION", "unknown"), |
|
"HOSTNAME": DataDogLogger._get_datadog_hostname(), |
|
"POD_NAME": os.getenv("POD_NAME", "unknown"), |
|
} |
|
|
|
tags = [f"{k}:{v}" for k, v in base_tags.items()] |
|
|
|
if standard_logging_object: |
|
_request_tags: List[str] = ( |
|
standard_logging_object.get("request_tags", []) or [] |
|
) |
|
request_tags = [f"request_tag:{tag}" for tag in _request_tags] |
|
tags.extend(request_tags) |
|
|
|
return ",".join(tags) |
|
|
|
@staticmethod |
|
def _get_datadog_source(): |
|
return os.getenv("DD_SOURCE", "litellm") |
|
|
|
@staticmethod |
|
def _get_datadog_service(): |
|
return os.getenv("DD_SERVICE", "litellm-server") |
|
|
|
@staticmethod |
|
def _get_datadog_hostname(): |
|
return os.getenv("HOSTNAME", "") |
|
|
|
@staticmethod |
|
def _get_datadog_env(): |
|
return os.getenv("DD_ENV", "unknown") |
|
|
|
@staticmethod |
|
def _get_datadog_pod_name(): |
|
return os.getenv("POD_NAME", "unknown") |
|
|
|
async def async_health_check(self) -> IntegrationHealthCheckStatus: |
|
""" |
|
Check if the service is healthy |
|
""" |
|
from litellm.litellm_core_utils.litellm_logging import ( |
|
create_dummy_standard_logging_payload, |
|
) |
|
|
|
standard_logging_object = create_dummy_standard_logging_payload() |
|
dd_payload = self._create_datadog_logging_payload_helper( |
|
standard_logging_object=standard_logging_object, |
|
status=DataDogStatus.INFO, |
|
) |
|
log_queue = [dd_payload] |
|
response = await self.async_send_compressed_data(log_queue) |
|
try: |
|
response.raise_for_status() |
|
return IntegrationHealthCheckStatus( |
|
status="healthy", |
|
error_message=None, |
|
) |
|
except httpx.HTTPStatusError as e: |
|
return IntegrationHealthCheckStatus( |
|
status="unhealthy", |
|
error_message=e.response.text, |
|
) |
|
except Exception as e: |
|
return IntegrationHealthCheckStatus( |
|
status="unhealthy", |
|
error_message=str(e), |
|
) |
|
|