File size: 4,822 Bytes
e3278e4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
"""
Handles logging DB success/failure to ServiceLogger()
ServiceLogger() then sends DB logs to Prometheus, OTEL, Datadog etc
"""
import asyncio
from datetime import datetime
from functools import wraps
from typing import Callable, Dict, Tuple
from litellm._service_logger import ServiceTypes
from litellm.litellm_core_utils.core_helpers import (
_get_parent_otel_span_from_kwargs,
get_litellm_metadata_from_kwargs,
)
def log_db_metrics(func):
"""
Decorator to log the duration of a DB related function to ServiceLogger()
Handles logging DB success/failure to ServiceLogger(), which logs to Prometheus, OTEL, Datadog
When logging Failure it checks if the Exception is a PrismaError, httpx.ConnectError or httpx.TimeoutException and then logs that as a DB Service Failure
Args:
func: The function to be decorated
Returns:
Result from the decorated function
Raises:
Exception: If the decorated function raises an exception
"""
@wraps(func)
async def wrapper(*args, **kwargs):
start_time: datetime = datetime.now()
try:
result = await func(*args, **kwargs)
end_time: datetime = datetime.now()
from litellm.proxy.proxy_server import proxy_logging_obj
if "PROXY" not in func.__name__:
asyncio.create_task(
proxy_logging_obj.service_logging_obj.async_service_success_hook(
service=ServiceTypes.DB,
call_type=func.__name__,
parent_otel_span=kwargs.get("parent_otel_span", None),
duration=(end_time - start_time).total_seconds(),
start_time=start_time,
end_time=end_time,
event_metadata={
"function_name": func.__name__,
"function_kwargs": kwargs,
"function_args": args,
},
)
)
elif (
# in litellm custom callbacks kwargs is passed as arg[0]
# https://docs.litellm.ai/docs/observability/custom_callback#callback-functions
args is not None
and len(args) > 0
and isinstance(args[0], dict)
):
passed_kwargs = args[0]
parent_otel_span = _get_parent_otel_span_from_kwargs(
kwargs=passed_kwargs
)
if parent_otel_span is not None:
metadata = get_litellm_metadata_from_kwargs(kwargs=passed_kwargs)
asyncio.create_task(
proxy_logging_obj.service_logging_obj.async_service_success_hook(
service=ServiceTypes.BATCH_WRITE_TO_DB,
call_type=func.__name__,
parent_otel_span=parent_otel_span,
duration=0.0,
start_time=start_time,
end_time=end_time,
event_metadata=metadata,
)
)
# end of logging to otel
return result
except Exception as e:
end_time: datetime = datetime.now()
await _handle_logging_db_exception(
e=e,
func=func,
kwargs=kwargs,
args=args,
start_time=start_time,
end_time=end_time,
)
raise e
return wrapper
def _is_exception_related_to_db(e: Exception) -> bool:
"""
Returns True if the exception is related to the DB
"""
import httpx
from prisma.errors import PrismaError
return isinstance(e, (PrismaError, httpx.ConnectError, httpx.TimeoutException))
async def _handle_logging_db_exception(
e: Exception,
func: Callable,
kwargs: Dict,
args: Tuple,
start_time: datetime,
end_time: datetime,
) -> None:
from litellm.proxy.proxy_server import proxy_logging_obj
# don't log this as a DB Service Failure, if the DB did not raise an exception
if _is_exception_related_to_db(e) is not True:
return
await proxy_logging_obj.service_logging_obj.async_service_failure_hook(
error=e,
service=ServiceTypes.DB,
call_type=func.__name__,
parent_otel_span=kwargs.get("parent_otel_span"),
duration=(end_time - start_time).total_seconds(),
start_time=start_time,
end_time=end_time,
event_metadata={
"function_name": func.__name__,
"function_kwargs": kwargs,
"function_args": args,
},
)
|