|
""" |
|
Handles logging DB success/failure to ServiceLogger() |
|
|
|
ServiceLogger() then sends DB logs to Prometheus, OTEL, Datadog etc |
|
""" |
|
|
|
import asyncio |
|
from datetime import datetime |
|
from functools import wraps |
|
from typing import Callable, Dict, Tuple |
|
|
|
from litellm._service_logger import ServiceTypes |
|
from litellm.litellm_core_utils.core_helpers import ( |
|
_get_parent_otel_span_from_kwargs, |
|
get_litellm_metadata_from_kwargs, |
|
) |
|
|
|
|
|
def log_db_metrics(func): |
|
""" |
|
Decorator to log the duration of a DB related function to ServiceLogger() |
|
|
|
Handles logging DB success/failure to ServiceLogger(), which logs to Prometheus, OTEL, Datadog |
|
|
|
When logging Failure it checks if the Exception is a PrismaError, httpx.ConnectError or httpx.TimeoutException and then logs that as a DB Service Failure |
|
|
|
Args: |
|
func: The function to be decorated |
|
|
|
Returns: |
|
Result from the decorated function |
|
|
|
Raises: |
|
Exception: If the decorated function raises an exception |
|
""" |
|
|
|
@wraps(func) |
|
async def wrapper(*args, **kwargs): |
|
|
|
start_time: datetime = datetime.now() |
|
|
|
try: |
|
result = await func(*args, **kwargs) |
|
end_time: datetime = datetime.now() |
|
from litellm.proxy.proxy_server import proxy_logging_obj |
|
|
|
if "PROXY" not in func.__name__: |
|
asyncio.create_task( |
|
proxy_logging_obj.service_logging_obj.async_service_success_hook( |
|
service=ServiceTypes.DB, |
|
call_type=func.__name__, |
|
parent_otel_span=kwargs.get("parent_otel_span", None), |
|
duration=(end_time - start_time).total_seconds(), |
|
start_time=start_time, |
|
end_time=end_time, |
|
event_metadata={ |
|
"function_name": func.__name__, |
|
"function_kwargs": kwargs, |
|
"function_args": args, |
|
}, |
|
) |
|
) |
|
elif ( |
|
|
|
|
|
args is not None |
|
and len(args) > 0 |
|
and isinstance(args[0], dict) |
|
): |
|
passed_kwargs = args[0] |
|
parent_otel_span = _get_parent_otel_span_from_kwargs( |
|
kwargs=passed_kwargs |
|
) |
|
if parent_otel_span is not None: |
|
metadata = get_litellm_metadata_from_kwargs(kwargs=passed_kwargs) |
|
|
|
asyncio.create_task( |
|
proxy_logging_obj.service_logging_obj.async_service_success_hook( |
|
service=ServiceTypes.BATCH_WRITE_TO_DB, |
|
call_type=func.__name__, |
|
parent_otel_span=parent_otel_span, |
|
duration=0.0, |
|
start_time=start_time, |
|
end_time=end_time, |
|
event_metadata=metadata, |
|
) |
|
) |
|
|
|
return result |
|
except Exception as e: |
|
end_time: datetime = datetime.now() |
|
await _handle_logging_db_exception( |
|
e=e, |
|
func=func, |
|
kwargs=kwargs, |
|
args=args, |
|
start_time=start_time, |
|
end_time=end_time, |
|
) |
|
raise e |
|
|
|
return wrapper |
|
|
|
|
|
def _is_exception_related_to_db(e: Exception) -> bool: |
|
""" |
|
Returns True if the exception is related to the DB |
|
""" |
|
|
|
import httpx |
|
from prisma.errors import PrismaError |
|
|
|
return isinstance(e, (PrismaError, httpx.ConnectError, httpx.TimeoutException)) |
|
|
|
|
|
async def _handle_logging_db_exception( |
|
e: Exception, |
|
func: Callable, |
|
kwargs: Dict, |
|
args: Tuple, |
|
start_time: datetime, |
|
end_time: datetime, |
|
) -> None: |
|
from litellm.proxy.proxy_server import proxy_logging_obj |
|
|
|
|
|
if _is_exception_related_to_db(e) is not True: |
|
return |
|
|
|
await proxy_logging_obj.service_logging_obj.async_service_failure_hook( |
|
error=e, |
|
service=ServiceTypes.DB, |
|
call_type=func.__name__, |
|
parent_otel_span=kwargs.get("parent_otel_span"), |
|
duration=(end_time - start_time).total_seconds(), |
|
start_time=start_time, |
|
end_time=end_time, |
|
event_metadata={ |
|
"function_name": func.__name__, |
|
"function_kwargs": kwargs, |
|
"function_args": args, |
|
}, |
|
) |
|
|