|
import json |
|
from datetime import datetime |
|
from typing import Optional, Union |
|
from urllib.parse import urlparse |
|
|
|
import httpx |
|
|
|
from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj |
|
from litellm.proxy._types import PassThroughEndpointLoggingResultValues |
|
from litellm.types.utils import StandardPassThroughResponseObject |
|
from litellm.utils import executor as thread_pool_executor |
|
|
|
from .llm_provider_handlers.anthropic_passthrough_logging_handler import ( |
|
AnthropicPassthroughLoggingHandler, |
|
) |
|
from .llm_provider_handlers.assembly_passthrough_logging_handler import ( |
|
AssemblyAIPassthroughLoggingHandler, |
|
) |
|
from .llm_provider_handlers.vertex_passthrough_logging_handler import ( |
|
VertexPassthroughLoggingHandler, |
|
) |
|
|
|
|
|
class PassThroughEndpointLogging: |
|
def __init__(self): |
|
self.TRACKED_VERTEX_ROUTES = [ |
|
"generateContent", |
|
"streamGenerateContent", |
|
"predict", |
|
] |
|
|
|
|
|
self.TRACKED_ANTHROPIC_ROUTES = ["/messages"] |
|
|
|
self.assemblyai_passthrough_logging_handler = ( |
|
AssemblyAIPassthroughLoggingHandler() |
|
) |
|
|
|
async def _handle_logging( |
|
self, |
|
logging_obj: LiteLLMLoggingObj, |
|
standard_logging_response_object: Union[ |
|
StandardPassThroughResponseObject, |
|
PassThroughEndpointLoggingResultValues, |
|
dict, |
|
], |
|
result: str, |
|
start_time: datetime, |
|
end_time: datetime, |
|
cache_hit: bool, |
|
**kwargs, |
|
): |
|
"""Helper function to handle both sync and async logging operations""" |
|
|
|
thread_pool_executor.submit( |
|
logging_obj.success_handler, |
|
standard_logging_response_object, |
|
start_time, |
|
end_time, |
|
cache_hit, |
|
**kwargs, |
|
) |
|
|
|
|
|
await logging_obj.async_success_handler( |
|
result=( |
|
json.dumps(result) |
|
if isinstance(result, dict) |
|
else standard_logging_response_object |
|
), |
|
start_time=start_time, |
|
end_time=end_time, |
|
cache_hit=False, |
|
**kwargs, |
|
) |
|
|
|
async def pass_through_async_success_handler( |
|
self, |
|
httpx_response: httpx.Response, |
|
response_body: Optional[dict], |
|
logging_obj: LiteLLMLoggingObj, |
|
url_route: str, |
|
result: str, |
|
start_time: datetime, |
|
end_time: datetime, |
|
cache_hit: bool, |
|
**kwargs, |
|
): |
|
standard_logging_response_object: Optional[ |
|
PassThroughEndpointLoggingResultValues |
|
] = None |
|
if self.is_vertex_route(url_route): |
|
vertex_passthrough_logging_handler_result = ( |
|
VertexPassthroughLoggingHandler.vertex_passthrough_handler( |
|
httpx_response=httpx_response, |
|
logging_obj=logging_obj, |
|
url_route=url_route, |
|
result=result, |
|
start_time=start_time, |
|
end_time=end_time, |
|
cache_hit=cache_hit, |
|
**kwargs, |
|
) |
|
) |
|
standard_logging_response_object = ( |
|
vertex_passthrough_logging_handler_result["result"] |
|
) |
|
kwargs = vertex_passthrough_logging_handler_result["kwargs"] |
|
elif self.is_anthropic_route(url_route): |
|
anthropic_passthrough_logging_handler_result = ( |
|
AnthropicPassthroughLoggingHandler.anthropic_passthrough_handler( |
|
httpx_response=httpx_response, |
|
response_body=response_body or {}, |
|
logging_obj=logging_obj, |
|
url_route=url_route, |
|
result=result, |
|
start_time=start_time, |
|
end_time=end_time, |
|
cache_hit=cache_hit, |
|
**kwargs, |
|
) |
|
) |
|
|
|
standard_logging_response_object = ( |
|
anthropic_passthrough_logging_handler_result["result"] |
|
) |
|
kwargs = anthropic_passthrough_logging_handler_result["kwargs"] |
|
elif self.is_assemblyai_route(url_route): |
|
if ( |
|
AssemblyAIPassthroughLoggingHandler._should_log_request( |
|
httpx_response.request.method |
|
) |
|
is not True |
|
): |
|
return |
|
self.assemblyai_passthrough_logging_handler.assemblyai_passthrough_logging_handler( |
|
httpx_response=httpx_response, |
|
response_body=response_body or {}, |
|
logging_obj=logging_obj, |
|
url_route=url_route, |
|
result=result, |
|
start_time=start_time, |
|
end_time=end_time, |
|
cache_hit=cache_hit, |
|
**kwargs, |
|
) |
|
return |
|
|
|
if standard_logging_response_object is None: |
|
standard_logging_response_object = StandardPassThroughResponseObject( |
|
response=httpx_response.text |
|
) |
|
|
|
await self._handle_logging( |
|
logging_obj=logging_obj, |
|
standard_logging_response_object=standard_logging_response_object, |
|
result=result, |
|
start_time=start_time, |
|
end_time=end_time, |
|
cache_hit=cache_hit, |
|
**kwargs, |
|
) |
|
|
|
def is_vertex_route(self, url_route: str): |
|
for route in self.TRACKED_VERTEX_ROUTES: |
|
if route in url_route: |
|
return True |
|
return False |
|
|
|
def is_anthropic_route(self, url_route: str): |
|
for route in self.TRACKED_ANTHROPIC_ROUTES: |
|
if route in url_route: |
|
return True |
|
return False |
|
|
|
def is_assemblyai_route(self, url_route: str): |
|
parsed_url = urlparse(url_route) |
|
if parsed_url.hostname == "api.assemblyai.com": |
|
return True |
|
elif "/transcript" in parsed_url.path: |
|
return True |
|
return False |
|
|