File size: 5,045 Bytes
e3278e4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
import asyncio
import functools
from datetime import datetime
from typing import TYPE_CHECKING, Any, List, Optional, Union
from litellm._logging import verbose_logger
from litellm.types.utils import (
ModelResponse,
ModelResponseStream,
TextCompletionResponse,
)
if TYPE_CHECKING:
from litellm import ModelResponse as _ModelResponse
from litellm.litellm_core_utils.litellm_logging import (
Logging as LiteLLMLoggingObject,
)
LiteLLMModelResponse = _ModelResponse
else:
LiteLLMModelResponse = Any
LiteLLMLoggingObject = Any
import litellm
"""
Helper utils used for logging callbacks
"""
def convert_litellm_response_object_to_str(
response_obj: Union[Any, LiteLLMModelResponse]
) -> Optional[str]:
"""
Get the string of the response object from LiteLLM
"""
if isinstance(response_obj, litellm.ModelResponse):
response_str = ""
for choice in response_obj.choices:
if isinstance(choice, litellm.Choices):
if choice.message.content and isinstance(choice.message.content, str):
response_str += choice.message.content
return response_str
return None
def _assemble_complete_response_from_streaming_chunks(
result: Union[ModelResponse, TextCompletionResponse, ModelResponseStream],
start_time: datetime,
end_time: datetime,
request_kwargs: dict,
streaming_chunks: List[Any],
is_async: bool,
):
"""
Assemble a complete response from a streaming chunks
- assemble a complete streaming response if result.choices[0].finish_reason is not None
- else append the chunk to the streaming_chunks
Args:
result: ModelResponse
start_time: datetime
end_time: datetime
request_kwargs: dict
streaming_chunks: List[Any]
is_async: bool
Returns:
Optional[Union[ModelResponse, TextCompletionResponse]]: Complete streaming response
"""
complete_streaming_response: Optional[
Union[ModelResponse, TextCompletionResponse]
] = None
if result.choices[0].finish_reason is not None: # if it's the last chunk
streaming_chunks.append(result)
try:
complete_streaming_response = litellm.stream_chunk_builder(
chunks=streaming_chunks,
messages=request_kwargs.get("messages", None),
start_time=start_time,
end_time=end_time,
)
except Exception as e:
log_message = (
"Error occurred building stream chunk in {} success logging: {}".format(
"async" if is_async else "sync", str(e)
)
)
verbose_logger.exception(log_message)
complete_streaming_response = None
else:
streaming_chunks.append(result)
return complete_streaming_response
def _set_duration_in_model_call_details(
logging_obj: Any, # we're not guaranteed this will be `LiteLLMLoggingObject`
start_time: datetime,
end_time: datetime,
):
"""Helper to set duration in model_call_details, with error handling"""
try:
duration_ms = (end_time - start_time).total_seconds() * 1000
if logging_obj and hasattr(logging_obj, "model_call_details"):
logging_obj.model_call_details["llm_api_duration_ms"] = duration_ms
else:
verbose_logger.debug(
"`logging_obj` not found - unable to track `llm_api_duration_ms"
)
except Exception as e:
verbose_logger.warning(f"Error setting `llm_api_duration_ms`: {str(e)}")
def track_llm_api_timing():
"""
Decorator to track LLM API call timing for both sync and async functions.
The logging_obj is expected to be passed as an argument to the decorated function.
"""
def decorator(func):
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
start_time = datetime.now()
try:
result = await func(*args, **kwargs)
return result
finally:
end_time = datetime.now()
_set_duration_in_model_call_details(
logging_obj=kwargs.get("logging_obj", None),
start_time=start_time,
end_time=end_time,
)
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
start_time = datetime.now()
try:
result = func(*args, **kwargs)
return result
finally:
end_time = datetime.now()
_set_duration_in_model_call_details(
logging_obj=kwargs.get("logging_obj", None),
start_time=start_time,
end_time=end_time,
)
# Check if the function is async or sync
if asyncio.iscoroutinefunction(func):
return async_wrapper
return sync_wrapper
return decorator
|