|
import asyncio |
|
from datetime import datetime, timedelta |
|
from typing import TYPE_CHECKING, Any, Optional, Union |
|
|
|
import litellm |
|
from litellm._logging import verbose_logger |
|
from litellm.proxy._types import UserAPIKeyAuth |
|
|
|
from .integrations.custom_logger import CustomLogger |
|
from .integrations.datadog.datadog import DataDogLogger |
|
from .integrations.opentelemetry import OpenTelemetry |
|
from .integrations.prometheus_services import PrometheusServicesLogger |
|
from .types.services import ServiceLoggerPayload, ServiceTypes |
|
|
|
if TYPE_CHECKING: |
|
from opentelemetry.trace import Span as _Span |
|
|
|
Span = _Span |
|
OTELClass = OpenTelemetry |
|
else: |
|
Span = Any |
|
OTELClass = Any |
|
|
|
|
|
class ServiceLogging(CustomLogger): |
|
""" |
|
Separate class used for monitoring health of litellm-adjacent services (redis/postgres). |
|
""" |
|
|
|
def __init__(self, mock_testing: bool = False) -> None: |
|
self.mock_testing = mock_testing |
|
self.mock_testing_sync_success_hook = 0 |
|
self.mock_testing_async_success_hook = 0 |
|
self.mock_testing_sync_failure_hook = 0 |
|
self.mock_testing_async_failure_hook = 0 |
|
if "prometheus_system" in litellm.service_callback: |
|
self.prometheusServicesLogger = PrometheusServicesLogger() |
|
|
|
def service_success_hook( |
|
self, |
|
service: ServiceTypes, |
|
duration: float, |
|
call_type: str, |
|
parent_otel_span: Optional[Span] = None, |
|
start_time: Optional[Union[datetime, float]] = None, |
|
end_time: Optional[Union[float, datetime]] = None, |
|
): |
|
""" |
|
Handles both sync and async monitoring by checking for existing event loop. |
|
""" |
|
|
|
if self.mock_testing: |
|
self.mock_testing_sync_success_hook += 1 |
|
|
|
try: |
|
|
|
loop = asyncio.get_event_loop() |
|
|
|
if loop.is_running(): |
|
|
|
loop.create_task( |
|
self.async_service_success_hook( |
|
service=service, |
|
duration=duration, |
|
call_type=call_type, |
|
parent_otel_span=parent_otel_span, |
|
start_time=start_time, |
|
end_time=end_time, |
|
) |
|
) |
|
else: |
|
|
|
loop.run_until_complete( |
|
self.async_service_success_hook( |
|
service=service, |
|
duration=duration, |
|
call_type=call_type, |
|
parent_otel_span=parent_otel_span, |
|
start_time=start_time, |
|
end_time=end_time, |
|
) |
|
) |
|
except RuntimeError: |
|
|
|
asyncio.run( |
|
self.async_service_success_hook( |
|
service=service, |
|
duration=duration, |
|
call_type=call_type, |
|
parent_otel_span=parent_otel_span, |
|
start_time=start_time, |
|
end_time=end_time, |
|
) |
|
) |
|
|
|
def service_failure_hook( |
|
self, service: ServiceTypes, duration: float, error: Exception, call_type: str |
|
): |
|
""" |
|
[TODO] Not implemented for sync calls yet. V0 is focused on async monitoring (used by proxy). |
|
""" |
|
if self.mock_testing: |
|
self.mock_testing_sync_failure_hook += 1 |
|
|
|
async def async_service_success_hook( |
|
self, |
|
service: ServiceTypes, |
|
call_type: str, |
|
duration: float, |
|
parent_otel_span: Optional[Span] = None, |
|
start_time: Optional[Union[datetime, float]] = None, |
|
end_time: Optional[Union[datetime, float]] = None, |
|
event_metadata: Optional[dict] = None, |
|
): |
|
""" |
|
- For counting if the redis, postgres call is successful |
|
""" |
|
if self.mock_testing: |
|
self.mock_testing_async_success_hook += 1 |
|
|
|
payload = ServiceLoggerPayload( |
|
is_error=False, |
|
error=None, |
|
service=service, |
|
duration=duration, |
|
call_type=call_type, |
|
) |
|
|
|
for callback in litellm.service_callback: |
|
if callback == "prometheus_system": |
|
await self.init_prometheus_services_logger_if_none() |
|
await self.prometheusServicesLogger.async_service_success_hook( |
|
payload=payload |
|
) |
|
elif callback == "datadog" or isinstance(callback, DataDogLogger): |
|
await self.init_datadog_logger_if_none() |
|
await self.dd_logger.async_service_success_hook( |
|
payload=payload, |
|
parent_otel_span=parent_otel_span, |
|
start_time=start_time, |
|
end_time=end_time, |
|
event_metadata=event_metadata, |
|
) |
|
elif callback == "otel" or isinstance(callback, OpenTelemetry): |
|
from litellm.proxy.proxy_server import open_telemetry_logger |
|
|
|
await self.init_otel_logger_if_none() |
|
|
|
if ( |
|
parent_otel_span is not None |
|
and open_telemetry_logger is not None |
|
and isinstance(open_telemetry_logger, OpenTelemetry) |
|
): |
|
await self.otel_logger.async_service_success_hook( |
|
payload=payload, |
|
parent_otel_span=parent_otel_span, |
|
start_time=start_time, |
|
end_time=end_time, |
|
event_metadata=event_metadata, |
|
) |
|
|
|
async def init_prometheus_services_logger_if_none(self): |
|
""" |
|
initializes prometheusServicesLogger if it is None or no attribute exists on ServiceLogging Object |
|
|
|
""" |
|
if not hasattr(self, "prometheusServicesLogger"): |
|
self.prometheusServicesLogger = PrometheusServicesLogger() |
|
elif self.prometheusServicesLogger is None: |
|
self.prometheusServicesLogger = self.prometheusServicesLogger() |
|
return |
|
|
|
async def init_datadog_logger_if_none(self): |
|
""" |
|
initializes dd_logger if it is None or no attribute exists on ServiceLogging Object |
|
|
|
""" |
|
from litellm.integrations.datadog.datadog import DataDogLogger |
|
|
|
if not hasattr(self, "dd_logger"): |
|
self.dd_logger: DataDogLogger = DataDogLogger() |
|
|
|
return |
|
|
|
async def init_otel_logger_if_none(self): |
|
""" |
|
initializes otel_logger if it is None or no attribute exists on ServiceLogging Object |
|
|
|
""" |
|
from litellm.proxy.proxy_server import open_telemetry_logger |
|
|
|
if not hasattr(self, "otel_logger"): |
|
if open_telemetry_logger is not None and isinstance( |
|
open_telemetry_logger, OpenTelemetry |
|
): |
|
self.otel_logger: OpenTelemetry = open_telemetry_logger |
|
else: |
|
verbose_logger.warning( |
|
"ServiceLogger: open_telemetry_logger is None or not an instance of OpenTelemetry" |
|
) |
|
return |
|
|
|
async def async_service_failure_hook( |
|
self, |
|
service: ServiceTypes, |
|
duration: float, |
|
error: Union[str, Exception], |
|
call_type: str, |
|
parent_otel_span: Optional[Span] = None, |
|
start_time: Optional[Union[datetime, float]] = None, |
|
end_time: Optional[Union[float, datetime]] = None, |
|
event_metadata: Optional[dict] = None, |
|
): |
|
""" |
|
- For counting if the redis, postgres call is unsuccessful |
|
""" |
|
if self.mock_testing: |
|
self.mock_testing_async_failure_hook += 1 |
|
|
|
error_message = "" |
|
if isinstance(error, Exception): |
|
error_message = str(error) |
|
elif isinstance(error, str): |
|
error_message = error |
|
|
|
payload = ServiceLoggerPayload( |
|
is_error=True, |
|
error=error_message, |
|
service=service, |
|
duration=duration, |
|
call_type=call_type, |
|
) |
|
|
|
for callback in litellm.service_callback: |
|
if callback == "prometheus_system": |
|
await self.init_prometheus_services_logger_if_none() |
|
await self.prometheusServicesLogger.async_service_failure_hook( |
|
payload=payload, |
|
error=error, |
|
) |
|
elif callback == "datadog" or isinstance(callback, DataDogLogger): |
|
await self.init_datadog_logger_if_none() |
|
await self.dd_logger.async_service_failure_hook( |
|
payload=payload, |
|
error=error_message, |
|
parent_otel_span=parent_otel_span, |
|
start_time=start_time, |
|
end_time=end_time, |
|
event_metadata=event_metadata, |
|
) |
|
elif callback == "otel" or isinstance(callback, OpenTelemetry): |
|
from litellm.proxy.proxy_server import open_telemetry_logger |
|
|
|
await self.init_otel_logger_if_none() |
|
|
|
if not isinstance(error, str): |
|
error = str(error) |
|
|
|
if ( |
|
parent_otel_span is not None |
|
and open_telemetry_logger is not None |
|
and isinstance(open_telemetry_logger, OpenTelemetry) |
|
): |
|
await self.otel_logger.async_service_success_hook( |
|
payload=payload, |
|
parent_otel_span=parent_otel_span, |
|
start_time=start_time, |
|
end_time=end_time, |
|
event_metadata=event_metadata, |
|
) |
|
|
|
async def async_post_call_failure_hook( |
|
self, |
|
request_data: dict, |
|
original_exception: Exception, |
|
user_api_key_dict: UserAPIKeyAuth, |
|
): |
|
""" |
|
Hook to track failed litellm-service calls |
|
""" |
|
return await super().async_post_call_failure_hook( |
|
request_data, |
|
original_exception, |
|
user_api_key_dict, |
|
) |
|
|
|
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): |
|
""" |
|
Hook to track latency for litellm proxy llm api calls |
|
""" |
|
try: |
|
_duration = end_time - start_time |
|
if isinstance(_duration, timedelta): |
|
_duration = _duration.total_seconds() |
|
elif isinstance(_duration, float): |
|
pass |
|
else: |
|
raise Exception( |
|
"Duration={} is not a float or timedelta object. type={}".format( |
|
_duration, type(_duration) |
|
) |
|
) |
|
await self.async_service_success_hook( |
|
service=ServiceTypes.LITELLM, |
|
duration=_duration, |
|
call_type=kwargs["call_type"], |
|
) |
|
except Exception as e: |
|
raise e |
|
|