|
|
|
|
|
import asyncio |
|
import datetime |
|
import os |
|
import random |
|
import time |
|
from datetime import timedelta |
|
from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Union |
|
|
|
from openai import APIError |
|
|
|
import litellm |
|
import litellm.litellm_core_utils |
|
import litellm.litellm_core_utils.litellm_logging |
|
import litellm.types |
|
from litellm._logging import verbose_logger, verbose_proxy_logger |
|
from litellm.caching.caching import DualCache |
|
from litellm.integrations.custom_batch_logger import CustomBatchLogger |
|
from litellm.litellm_core_utils.duration_parser import duration_in_seconds |
|
from litellm.litellm_core_utils.exception_mapping_utils import ( |
|
_add_key_name_and_team_to_alert, |
|
) |
|
from litellm.llms.custom_httpx.http_handler import ( |
|
get_async_httpx_client, |
|
httpxSpecialProvider, |
|
) |
|
from litellm.proxy._types import AlertType, CallInfo, VirtualKeyEvent, WebhookEvent |
|
from litellm.types.integrations.slack_alerting import * |
|
|
|
from ..email_templates.templates import * |
|
from .batching_handler import send_to_webhook, squash_payloads |
|
from .utils import _add_langfuse_trace_id_to_alert, process_slack_alerting_variables |
|
|
|
if TYPE_CHECKING: |
|
from litellm.router import Router as _Router |
|
|
|
Router = _Router |
|
else: |
|
Router = Any |
|
|
|
|
|
class SlackAlerting(CustomBatchLogger): |
|
""" |
|
Class for sending Slack Alerts |
|
""" |
|
|
|
|
|
def __init__( |
|
self, |
|
internal_usage_cache: Optional[DualCache] = None, |
|
alerting_threshold: Optional[ |
|
float |
|
] = None, |
|
alerting: Optional[List] = [], |
|
alert_types: List[AlertType] = DEFAULT_ALERT_TYPES, |
|
alert_to_webhook_url: Optional[ |
|
Dict[AlertType, Union[List[str], str]] |
|
] = None, |
|
alerting_args={}, |
|
default_webhook_url: Optional[str] = None, |
|
**kwargs, |
|
): |
|
if alerting_threshold is None: |
|
alerting_threshold = 300 |
|
self.alerting_threshold = alerting_threshold |
|
self.alerting = alerting |
|
self.alert_types = alert_types |
|
self.internal_usage_cache = internal_usage_cache or DualCache() |
|
self.async_http_handler = get_async_httpx_client( |
|
llm_provider=httpxSpecialProvider.LoggingCallback |
|
) |
|
self.alert_to_webhook_url = process_slack_alerting_variables( |
|
alert_to_webhook_url=alert_to_webhook_url |
|
) |
|
self.is_running = False |
|
self.alerting_args = SlackAlertingArgs(**alerting_args) |
|
self.default_webhook_url = default_webhook_url |
|
self.flush_lock = asyncio.Lock() |
|
super().__init__(**kwargs, flush_lock=self.flush_lock) |
|
|
|
def update_values( |
|
self, |
|
alerting: Optional[List] = None, |
|
alerting_threshold: Optional[float] = None, |
|
alert_types: Optional[List[AlertType]] = None, |
|
alert_to_webhook_url: Optional[Dict[AlertType, Union[List[str], str]]] = None, |
|
alerting_args: Optional[Dict] = None, |
|
llm_router: Optional[Router] = None, |
|
): |
|
if alerting is not None: |
|
self.alerting = alerting |
|
asyncio.create_task(self.periodic_flush()) |
|
if alerting_threshold is not None: |
|
self.alerting_threshold = alerting_threshold |
|
if alert_types is not None: |
|
self.alert_types = alert_types |
|
if alerting_args is not None: |
|
self.alerting_args = SlackAlertingArgs(**alerting_args) |
|
if alert_to_webhook_url is not None: |
|
|
|
if self.alert_to_webhook_url is None: |
|
self.alert_to_webhook_url = process_slack_alerting_variables( |
|
alert_to_webhook_url=alert_to_webhook_url |
|
) |
|
else: |
|
_new_values = ( |
|
process_slack_alerting_variables( |
|
alert_to_webhook_url=alert_to_webhook_url |
|
) |
|
or {} |
|
) |
|
self.alert_to_webhook_url.update(_new_values) |
|
if llm_router is not None: |
|
self.llm_router = llm_router |
|
|
|
async def deployment_in_cooldown(self): |
|
pass |
|
|
|
async def deployment_removed_from_cooldown(self): |
|
pass |
|
|
|
def _all_possible_alert_types(self): |
|
|
|
|
|
|
|
return list(AlertType) |
|
|
|
def _response_taking_too_long_callback_helper( |
|
self, |
|
kwargs, |
|
start_time, |
|
end_time, |
|
): |
|
try: |
|
time_difference = end_time - start_time |
|
|
|
time_difference_float = time_difference.total_seconds() |
|
litellm_params = kwargs.get("litellm_params", {}) |
|
model = kwargs.get("model", "") |
|
api_base = litellm.get_api_base(model=model, optional_params=litellm_params) |
|
messages = kwargs.get("messages", None) |
|
|
|
if messages is None: |
|
messages = kwargs.get("input", None) |
|
|
|
|
|
_messages = str(messages)[:100] |
|
|
|
return time_difference_float, model, api_base, _messages |
|
except Exception as e: |
|
raise e |
|
|
|
def _get_deployment_latencies_to_alert(self, metadata=None): |
|
if metadata is None: |
|
return None |
|
|
|
if "_latency_per_deployment" in metadata: |
|
|
|
|
|
""" |
|
_latency_per_deployment: { |
|
api_base: 0.01336697916666667 |
|
} |
|
""" |
|
_message_to_send = "" |
|
_deployment_latencies = metadata["_latency_per_deployment"] |
|
if len(_deployment_latencies) == 0: |
|
return None |
|
_deployment_latency_map: Optional[dict] = None |
|
try: |
|
|
|
_deployment_latencies = sorted( |
|
_deployment_latencies.items(), key=lambda x: x[1] |
|
) |
|
_deployment_latency_map = dict(_deployment_latencies) |
|
except Exception: |
|
pass |
|
|
|
if _deployment_latency_map is None: |
|
return |
|
|
|
for api_base, latency in _deployment_latency_map.items(): |
|
_message_to_send += f"\n{api_base}: {round(latency,2)}s" |
|
_message_to_send = "```" + _message_to_send + "```" |
|
return _message_to_send |
|
|
|
async def response_taking_too_long_callback( |
|
self, |
|
kwargs, |
|
completion_response, |
|
start_time, |
|
end_time, |
|
): |
|
if self.alerting is None or self.alert_types is None: |
|
return |
|
|
|
time_difference_float, model, api_base, messages = ( |
|
self._response_taking_too_long_callback_helper( |
|
kwargs=kwargs, |
|
start_time=start_time, |
|
end_time=end_time, |
|
) |
|
) |
|
if litellm.turn_off_message_logging or litellm.redact_messages_in_exceptions: |
|
messages = "Message not logged. litellm.redact_messages_in_exceptions=True" |
|
request_info = f"\nRequest Model: `{model}`\nAPI Base: `{api_base}`\nMessages: `{messages}`" |
|
slow_message = f"`Responses are slow - {round(time_difference_float,2)}s response time > Alerting threshold: {self.alerting_threshold}s`" |
|
alerting_metadata: dict = {} |
|
if time_difference_float > self.alerting_threshold: |
|
|
|
if ( |
|
kwargs is not None |
|
and "litellm_params" in kwargs |
|
and "metadata" in kwargs["litellm_params"] |
|
): |
|
_metadata: dict = kwargs["litellm_params"]["metadata"] |
|
request_info = _add_key_name_and_team_to_alert( |
|
request_info=request_info, metadata=_metadata |
|
) |
|
|
|
_deployment_latency_map = self._get_deployment_latencies_to_alert( |
|
metadata=_metadata |
|
) |
|
if _deployment_latency_map is not None: |
|
request_info += ( |
|
f"\nAvailable Deployment Latencies\n{_deployment_latency_map}" |
|
) |
|
|
|
if "alerting_metadata" in _metadata: |
|
alerting_metadata = _metadata["alerting_metadata"] |
|
await self.send_alert( |
|
message=slow_message + request_info, |
|
level="Low", |
|
alert_type=AlertType.llm_too_slow, |
|
alerting_metadata=alerting_metadata, |
|
) |
|
|
|
async def async_update_daily_reports( |
|
self, deployment_metrics: DeploymentMetrics |
|
) -> int: |
|
""" |
|
Store the perf by deployment in cache |
|
- Number of failed requests per deployment |
|
- Latency / output tokens per deployment |
|
|
|
'deployment_id:daily_metrics:failed_requests' |
|
'deployment_id:daily_metrics:latency_per_output_token' |
|
|
|
Returns |
|
int - count of metrics set (1 - if just latency, 2 - if failed + latency) |
|
""" |
|
|
|
return_val = 0 |
|
try: |
|
|
|
if deployment_metrics.failed_request: |
|
await self.internal_usage_cache.async_increment_cache( |
|
key="{}:{}".format( |
|
deployment_metrics.id, |
|
SlackAlertingCacheKeys.failed_requests_key.value, |
|
), |
|
value=1, |
|
parent_otel_span=None, |
|
) |
|
|
|
return_val += 1 |
|
|
|
|
|
if deployment_metrics.latency_per_output_token is not None: |
|
await self.internal_usage_cache.async_increment_cache( |
|
key="{}:{}".format( |
|
deployment_metrics.id, SlackAlertingCacheKeys.latency_key.value |
|
), |
|
value=deployment_metrics.latency_per_output_token, |
|
parent_otel_span=None, |
|
) |
|
|
|
return_val += 1 |
|
|
|
return return_val |
|
except Exception: |
|
return 0 |
|
|
|
async def send_daily_reports(self, router) -> bool: |
|
""" |
|
Send a daily report on: |
|
- Top 5 deployments with most failed requests |
|
- Top 5 slowest deployments (normalized by latency/output tokens) |
|
|
|
Get the value from redis cache (if available) or in-memory and send it |
|
|
|
Cleanup: |
|
- reset values in cache -> prevent memory leak |
|
|
|
Returns: |
|
True -> if successfuly sent |
|
False -> if not sent |
|
""" |
|
|
|
ids = router.get_model_ids() |
|
|
|
|
|
failed_request_keys = [ |
|
"{}:{}".format(id, SlackAlertingCacheKeys.failed_requests_key.value) |
|
for id in ids |
|
] |
|
latency_keys = [ |
|
"{}:{}".format(id, SlackAlertingCacheKeys.latency_key.value) for id in ids |
|
] |
|
|
|
combined_metrics_keys = failed_request_keys + latency_keys |
|
|
|
combined_metrics_values = await self.internal_usage_cache.async_batch_get_cache( |
|
keys=combined_metrics_keys |
|
) |
|
|
|
if combined_metrics_values is None: |
|
return False |
|
|
|
all_none = True |
|
for val in combined_metrics_values: |
|
if val is not None and val > 0: |
|
all_none = False |
|
break |
|
|
|
if all_none: |
|
return False |
|
|
|
failed_request_values = combined_metrics_values[ |
|
: len(failed_request_keys) |
|
] |
|
latency_values = combined_metrics_values[len(failed_request_keys) :] |
|
|
|
|
|
|
|
placeholder_value = 0 |
|
replaced_failed_values = [ |
|
value if value is not None else placeholder_value |
|
for value in failed_request_values |
|
] |
|
|
|
|
|
top_5_failed = sorted( |
|
range(len(replaced_failed_values)), |
|
key=lambda i: replaced_failed_values[i], |
|
reverse=True, |
|
)[:5] |
|
top_5_failed = [ |
|
index for index in top_5_failed if replaced_failed_values[index] > 0 |
|
] |
|
|
|
|
|
|
|
placeholder_value = 0 |
|
replaced_slowest_values = [ |
|
value if value is not None else placeholder_value |
|
for value in latency_values |
|
] |
|
|
|
|
|
top_5_slowest = sorted( |
|
range(len(replaced_slowest_values)), |
|
key=lambda i: replaced_slowest_values[i], |
|
reverse=True, |
|
)[:5] |
|
top_5_slowest = [ |
|
index for index in top_5_slowest if replaced_slowest_values[index] > 0 |
|
] |
|
|
|
|
|
message = f"\n\nTime: `{time.time()}`s\nHere are today's key metrics 📈: \n\n" |
|
|
|
message += "\n\n*❗️ Top Deployments with Most Failed Requests:*\n\n" |
|
if not top_5_failed: |
|
message += "\tNone\n" |
|
for i in range(len(top_5_failed)): |
|
key = failed_request_keys[top_5_failed[i]].split(":")[0] |
|
_deployment = router.get_model_info(key) |
|
if isinstance(_deployment, dict): |
|
deployment_name = _deployment["litellm_params"].get("model", "") |
|
else: |
|
return False |
|
|
|
api_base = litellm.get_api_base( |
|
model=deployment_name, |
|
optional_params=( |
|
_deployment["litellm_params"] if _deployment is not None else {} |
|
), |
|
) |
|
if api_base is None: |
|
api_base = "" |
|
value = replaced_failed_values[top_5_failed[i]] |
|
message += f"\t{i+1}. Deployment: `{deployment_name}`, Failed Requests: `{value}`, API Base: `{api_base}`\n" |
|
|
|
message += "\n\n*😅 Top Slowest Deployments:*\n\n" |
|
if not top_5_slowest: |
|
message += "\tNone\n" |
|
for i in range(len(top_5_slowest)): |
|
key = latency_keys[top_5_slowest[i]].split(":")[0] |
|
_deployment = router.get_model_info(key) |
|
if _deployment is not None: |
|
deployment_name = _deployment["litellm_params"].get("model", "") |
|
else: |
|
deployment_name = "" |
|
api_base = litellm.get_api_base( |
|
model=deployment_name, |
|
optional_params=( |
|
_deployment["litellm_params"] if _deployment is not None else {} |
|
), |
|
) |
|
value = round(replaced_slowest_values[top_5_slowest[i]], 3) |
|
message += f"\t{i+1}. Deployment: `{deployment_name}`, Latency per output token: `{value}s/token`, API Base: `{api_base}`\n\n" |
|
|
|
|
|
latency_cache_keys = [(key, 0) for key in latency_keys] |
|
failed_request_cache_keys = [(key, 0) for key in failed_request_keys] |
|
combined_metrics_cache_keys = latency_cache_keys + failed_request_cache_keys |
|
await self.internal_usage_cache.async_set_cache_pipeline( |
|
cache_list=combined_metrics_cache_keys |
|
) |
|
|
|
message += f"\n\nNext Run is at: `{time.time() + self.alerting_args.daily_report_frequency}`s" |
|
|
|
|
|
await self.send_alert( |
|
message=message, |
|
level="Low", |
|
alert_type=AlertType.daily_reports, |
|
alerting_metadata={}, |
|
) |
|
|
|
return True |
|
|
|
async def response_taking_too_long( |
|
self, |
|
start_time: Optional[datetime.datetime] = None, |
|
end_time: Optional[datetime.datetime] = None, |
|
type: Literal["hanging_request", "slow_response"] = "hanging_request", |
|
request_data: Optional[dict] = None, |
|
): |
|
if self.alerting is None or self.alert_types is None: |
|
return |
|
model: str = "" |
|
if request_data is not None: |
|
model = request_data.get("model", "") |
|
messages = request_data.get("messages", None) |
|
if messages is None: |
|
|
|
messages = request_data.get("input", None) |
|
|
|
|
|
try: |
|
messages = str(messages) |
|
messages = messages[:100] |
|
except Exception: |
|
messages = "" |
|
|
|
if ( |
|
litellm.turn_off_message_logging |
|
or litellm.redact_messages_in_exceptions |
|
): |
|
messages = ( |
|
"Message not logged. litellm.redact_messages_in_exceptions=True" |
|
) |
|
request_info = f"\nRequest Model: `{model}`\nMessages: `{messages}`" |
|
else: |
|
request_info = "" |
|
|
|
if type == "hanging_request": |
|
await asyncio.sleep( |
|
self.alerting_threshold |
|
) |
|
alerting_metadata: dict = {} |
|
if await self._request_is_completed(request_data=request_data) is True: |
|
return |
|
|
|
if request_data is not None: |
|
if request_data.get("deployment", None) is not None and isinstance( |
|
request_data["deployment"], dict |
|
): |
|
_api_base = litellm.get_api_base( |
|
model=model, |
|
optional_params=request_data["deployment"].get( |
|
"litellm_params", {} |
|
), |
|
) |
|
|
|
if _api_base is None: |
|
_api_base = "" |
|
|
|
request_info += f"\nAPI Base: {_api_base}" |
|
elif request_data.get("metadata", None) is not None and isinstance( |
|
request_data["metadata"], dict |
|
): |
|
|
|
|
|
_metadata: dict = request_data["metadata"] |
|
_api_base = _metadata.get("api_base", "") |
|
|
|
request_info = _add_key_name_and_team_to_alert( |
|
request_info=request_info, metadata=_metadata |
|
) |
|
|
|
if _api_base is None: |
|
_api_base = "" |
|
|
|
if "alerting_metadata" in _metadata: |
|
alerting_metadata = _metadata["alerting_metadata"] |
|
request_info += f"\nAPI Base: `{_api_base}`" |
|
|
|
alerting_message = ( |
|
f"`Requests are hanging - {self.alerting_threshold}s+ request time`" |
|
) |
|
|
|
if "langfuse" in litellm.success_callback: |
|
langfuse_url = await _add_langfuse_trace_id_to_alert( |
|
request_data=request_data, |
|
) |
|
|
|
if langfuse_url is not None: |
|
request_info += "\n🪢 Langfuse Trace: {}".format(langfuse_url) |
|
|
|
|
|
_deployment_latency_map = self._get_deployment_latencies_to_alert( |
|
metadata=request_data.get("metadata", {}) |
|
) |
|
if _deployment_latency_map is not None: |
|
request_info += f"\nDeployment Latencies\n{_deployment_latency_map}" |
|
|
|
await self.send_alert( |
|
message=alerting_message + request_info, |
|
level="Medium", |
|
alert_type=AlertType.llm_requests_hanging, |
|
alerting_metadata=alerting_metadata, |
|
) |
|
|
|
async def failed_tracking_alert(self, error_message: str, failing_model: str): |
|
""" |
|
Raise alert when tracking failed for specific model |
|
|
|
Args: |
|
error_message (str): Error message |
|
failing_model (str): Model that failed tracking |
|
""" |
|
if self.alerting is None or self.alert_types is None: |
|
|
|
return |
|
if "failed_tracking_spend" not in self.alert_types: |
|
return |
|
|
|
_cache: DualCache = self.internal_usage_cache |
|
message = "Failed Tracking Cost for " + error_message |
|
_cache_key = "budget_alerts:failed_tracking:{}".format(failing_model) |
|
result = await _cache.async_get_cache(key=_cache_key) |
|
if result is None: |
|
await self.send_alert( |
|
message=message, |
|
level="High", |
|
alert_type=AlertType.failed_tracking_spend, |
|
alerting_metadata={}, |
|
) |
|
await _cache.async_set_cache( |
|
key=_cache_key, |
|
value="SENT", |
|
ttl=self.alerting_args.budget_alert_ttl, |
|
) |
|
|
|
async def budget_alerts( |
|
self, |
|
type: Literal[ |
|
"token_budget", |
|
"soft_budget", |
|
"user_budget", |
|
"team_budget", |
|
"proxy_budget", |
|
"projected_limit_exceeded", |
|
], |
|
user_info: CallInfo, |
|
): |
|
|
|
|
|
|
|
|
|
_cache: DualCache = self.internal_usage_cache |
|
|
|
if self.alerting is None or self.alert_types is None: |
|
|
|
return |
|
if "budget_alerts" not in self.alert_types: |
|
return |
|
_id: Optional[str] = "default_id" |
|
user_info_json = user_info.model_dump(exclude_none=True) |
|
user_info_str = self._get_user_info_str(user_info) |
|
event: Optional[ |
|
Literal[ |
|
"budget_crossed", |
|
"threshold_crossed", |
|
"projected_limit_exceeded", |
|
"soft_budget_crossed", |
|
] |
|
] = None |
|
event_group: Optional[ |
|
Literal["internal_user", "team", "key", "proxy", "customer"] |
|
] = None |
|
event_message: str = "" |
|
webhook_event: Optional[WebhookEvent] = None |
|
if type == "proxy_budget": |
|
event_group = "proxy" |
|
event_message += "Proxy Budget: " |
|
elif type == "soft_budget": |
|
event_group = "proxy" |
|
event_message += "Soft Budget Crossed: " |
|
elif type == "user_budget": |
|
event_group = "internal_user" |
|
event_message += "User Budget: " |
|
_id = user_info.user_id or _id |
|
elif type == "team_budget": |
|
event_group = "team" |
|
event_message += "Team Budget: " |
|
_id = user_info.team_id or _id |
|
elif type == "token_budget": |
|
event_group = "key" |
|
event_message += "Key Budget: " |
|
_id = user_info.token |
|
elif type == "projected_limit_exceeded": |
|
event_group = "key" |
|
event_message += "Key Budget: Projected Limit Exceeded" |
|
event = "projected_limit_exceeded" |
|
_id = user_info.token |
|
|
|
|
|
if user_info.max_budget is None and user_info.soft_budget is None: |
|
return |
|
percent_left: float = 0 |
|
if user_info.max_budget is not None: |
|
if user_info.max_budget > 0: |
|
percent_left = ( |
|
user_info.max_budget - user_info.spend |
|
) / user_info.max_budget |
|
|
|
|
|
if user_info.max_budget is not None: |
|
if user_info.spend >= user_info.max_budget: |
|
event = "budget_crossed" |
|
event_message += ( |
|
f"Budget Crossed\n Total Budget:`{user_info.max_budget}`" |
|
) |
|
elif percent_left <= 0.05: |
|
event = "threshold_crossed" |
|
event_message += "5% Threshold Crossed " |
|
elif percent_left <= 0.15: |
|
event = "threshold_crossed" |
|
event_message += "15% Threshold Crossed" |
|
elif user_info.soft_budget is not None: |
|
if user_info.spend >= user_info.soft_budget: |
|
event = "soft_budget_crossed" |
|
if event is not None and event_group is not None: |
|
_cache_key = "budget_alerts:{}:{}".format(event, _id) |
|
result = await _cache.async_get_cache(key=_cache_key) |
|
if result is None: |
|
webhook_event = WebhookEvent( |
|
event=event, |
|
event_group=event_group, |
|
event_message=event_message, |
|
**user_info_json, |
|
) |
|
await self.send_alert( |
|
message=event_message + "\n\n" + user_info_str, |
|
level="High", |
|
alert_type=AlertType.budget_alerts, |
|
user_info=webhook_event, |
|
alerting_metadata={}, |
|
) |
|
await _cache.async_set_cache( |
|
key=_cache_key, |
|
value="SENT", |
|
ttl=self.alerting_args.budget_alert_ttl, |
|
) |
|
|
|
return |
|
return |
|
|
|
def _get_user_info_str(self, user_info: CallInfo) -> str: |
|
""" |
|
Create a standard message for a budget alert |
|
""" |
|
_all_fields_as_dict = user_info.model_dump(exclude_none=True) |
|
_all_fields_as_dict.pop("token") |
|
msg = "" |
|
for k, v in _all_fields_as_dict.items(): |
|
msg += f"*{k}:* `{v}`\n" |
|
|
|
return msg |
|
|
|
async def customer_spend_alert( |
|
self, |
|
token: Optional[str], |
|
key_alias: Optional[str], |
|
end_user_id: Optional[str], |
|
response_cost: Optional[float], |
|
max_budget: Optional[float], |
|
): |
|
if ( |
|
self.alerting is not None |
|
and "webhook" in self.alerting |
|
and end_user_id is not None |
|
and token is not None |
|
and response_cost is not None |
|
): |
|
|
|
event = WebhookEvent( |
|
spend=response_cost, |
|
max_budget=max_budget, |
|
token=token, |
|
customer_id=end_user_id, |
|
user_id=None, |
|
team_id=None, |
|
user_email=None, |
|
key_alias=key_alias, |
|
projected_exceeded_date=None, |
|
projected_spend=None, |
|
event="spend_tracked", |
|
event_group="customer", |
|
event_message="Customer spend tracked. Customer={}, spend={}".format( |
|
end_user_id, response_cost |
|
), |
|
) |
|
|
|
await self.send_webhook_alert(webhook_event=event) |
|
|
|
def _count_outage_alerts(self, alerts: List[int]) -> str: |
|
""" |
|
Parameters: |
|
- alerts: List[int] -> list of error codes (either 408 or 500+) |
|
|
|
Returns: |
|
- str -> formatted string. This is an alert message, giving a human-friendly description of the errors. |
|
""" |
|
error_breakdown = {"Timeout Errors": 0, "API Errors": 0, "Unknown Errors": 0} |
|
for alert in alerts: |
|
if alert == 408: |
|
error_breakdown["Timeout Errors"] += 1 |
|
elif alert >= 500: |
|
error_breakdown["API Errors"] += 1 |
|
else: |
|
error_breakdown["Unknown Errors"] += 1 |
|
|
|
error_msg = "" |
|
for key, value in error_breakdown.items(): |
|
if value > 0: |
|
error_msg += "\n{}: {}\n".format(key, value) |
|
|
|
return error_msg |
|
|
|
def _outage_alert_msg_factory( |
|
self, |
|
alert_type: Literal["Major", "Minor"], |
|
key: Literal["Model", "Region"], |
|
key_val: str, |
|
provider: str, |
|
api_base: Optional[str], |
|
outage_value: BaseOutageModel, |
|
) -> str: |
|
"""Format an alert message for slack""" |
|
headers = {f"{key} Name": key_val, "Provider": provider} |
|
if api_base is not None: |
|
headers["API Base"] = api_base |
|
|
|
headers_str = "\n" |
|
for k, v in headers.items(): |
|
headers_str += f"*{k}:* `{v}`\n" |
|
return f"""\n\n |
|
*⚠️ {alert_type} Service Outage* |
|
|
|
{headers_str} |
|
|
|
*Errors:* |
|
{self._count_outage_alerts(alerts=outage_value["alerts"])} |
|
|
|
*Last Check:* `{round(time.time() - outage_value["last_updated_at"], 4)}s ago`\n\n |
|
""" |
|
|
|
async def region_outage_alerts( |
|
self, |
|
exception: APIError, |
|
deployment_id: str, |
|
) -> None: |
|
""" |
|
Send slack alert if specific provider region is having an outage. |
|
|
|
Track for 408 (Timeout) and >=500 Error codes |
|
""" |
|
|
|
if self.llm_router is None: |
|
return |
|
|
|
deployment = self.llm_router.get_deployment(model_id=deployment_id) |
|
|
|
if deployment is None: |
|
return |
|
|
|
model = deployment.litellm_params.model |
|
|
|
provider = deployment.litellm_params.custom_llm_provider |
|
if provider is None: |
|
model, provider, _, _ = litellm.get_llm_provider(model=model) |
|
|
|
|
|
region_name = deployment.litellm_params.region_name |
|
if region_name is None: |
|
region_name = litellm.utils._get_model_region( |
|
custom_llm_provider=provider, litellm_params=deployment.litellm_params |
|
) |
|
|
|
if region_name is None: |
|
return |
|
|
|
|
|
cache_key = provider + region_name |
|
|
|
outage_value: Optional[ProviderRegionOutageModel] = ( |
|
await self.internal_usage_cache.async_get_cache(key=cache_key) |
|
) |
|
|
|
if ( |
|
getattr(exception, "status_code", None) is None |
|
or ( |
|
exception.status_code != 408 |
|
and exception.status_code < 500 |
|
) |
|
or self.llm_router is None |
|
): |
|
return |
|
|
|
if outage_value is None: |
|
_deployment_set = set() |
|
_deployment_set.add(deployment_id) |
|
outage_value = ProviderRegionOutageModel( |
|
provider_region_id=cache_key, |
|
alerts=[exception.status_code], |
|
minor_alert_sent=False, |
|
major_alert_sent=False, |
|
last_updated_at=time.time(), |
|
deployment_ids=_deployment_set, |
|
) |
|
|
|
|
|
await self.internal_usage_cache.async_set_cache( |
|
key=cache_key, |
|
value=outage_value, |
|
ttl=self.alerting_args.region_outage_alert_ttl, |
|
) |
|
return |
|
|
|
if len(outage_value["alerts"]) < self.alerting_args.max_outage_alert_list_size: |
|
outage_value["alerts"].append(exception.status_code) |
|
else: |
|
pass |
|
_deployment_set = outage_value["deployment_ids"] |
|
_deployment_set.add(deployment_id) |
|
outage_value["deployment_ids"] = _deployment_set |
|
outage_value["last_updated_at"] = time.time() |
|
|
|
|
|
if ( |
|
outage_value["minor_alert_sent"] is False |
|
and len(outage_value["alerts"]) |
|
>= self.alerting_args.minor_outage_alert_threshold |
|
and len(_deployment_set) > 1 |
|
): |
|
msg = self._outage_alert_msg_factory( |
|
alert_type="Minor", |
|
key="Region", |
|
key_val=region_name, |
|
api_base=None, |
|
outage_value=outage_value, |
|
provider=provider, |
|
) |
|
|
|
await self.send_alert( |
|
message=msg, |
|
level="Medium", |
|
alert_type=AlertType.outage_alerts, |
|
alerting_metadata={}, |
|
) |
|
|
|
outage_value["minor_alert_sent"] = True |
|
|
|
|
|
elif ( |
|
outage_value["major_alert_sent"] is False |
|
and len(outage_value["alerts"]) |
|
>= self.alerting_args.major_outage_alert_threshold |
|
and len(_deployment_set) > 1 |
|
): |
|
msg = self._outage_alert_msg_factory( |
|
alert_type="Major", |
|
key="Region", |
|
key_val=region_name, |
|
api_base=None, |
|
outage_value=outage_value, |
|
provider=provider, |
|
) |
|
|
|
|
|
await self.send_alert( |
|
message=msg, |
|
level="High", |
|
alert_type=AlertType.outage_alerts, |
|
alerting_metadata={}, |
|
) |
|
|
|
outage_value["major_alert_sent"] = True |
|
|
|
|
|
await self.internal_usage_cache.async_set_cache( |
|
key=cache_key, value=outage_value |
|
) |
|
|
|
async def outage_alerts( |
|
self, |
|
exception: APIError, |
|
deployment_id: str, |
|
) -> None: |
|
""" |
|
Send slack alert if model is badly configured / having an outage (408, 401, 429, >=500). |
|
|
|
key = model_id |
|
|
|
value = { |
|
- model_id |
|
- threshold |
|
- alerts [] |
|
} |
|
|
|
ttl = 1hr |
|
max_alerts_size = 10 |
|
""" |
|
try: |
|
outage_value: Optional[OutageModel] = await self.internal_usage_cache.async_get_cache(key=deployment_id) |
|
if ( |
|
getattr(exception, "status_code", None) is None |
|
or ( |
|
exception.status_code != 408 |
|
and exception.status_code < 500 |
|
) |
|
or self.llm_router is None |
|
): |
|
return |
|
|
|
|
|
deployment = self.llm_router.get_deployment(model_id=deployment_id) |
|
if deployment is None: |
|
return |
|
|
|
model = deployment.litellm_params.model |
|
provider = deployment.litellm_params.custom_llm_provider |
|
if provider is None: |
|
try: |
|
model, provider, _, _ = litellm.get_llm_provider(model=model) |
|
except Exception: |
|
provider = "" |
|
api_base = litellm.get_api_base( |
|
model=model, optional_params=deployment.litellm_params |
|
) |
|
|
|
if outage_value is None: |
|
outage_value = OutageModel( |
|
model_id=deployment_id, |
|
alerts=[exception.status_code], |
|
minor_alert_sent=False, |
|
major_alert_sent=False, |
|
last_updated_at=time.time(), |
|
) |
|
|
|
|
|
await self.internal_usage_cache.async_set_cache( |
|
key=deployment_id, |
|
value=outage_value, |
|
ttl=self.alerting_args.outage_alert_ttl, |
|
) |
|
return |
|
|
|
if ( |
|
len(outage_value["alerts"]) |
|
< self.alerting_args.max_outage_alert_list_size |
|
): |
|
outage_value["alerts"].append(exception.status_code) |
|
else: |
|
pass |
|
|
|
outage_value["last_updated_at"] = time.time() |
|
|
|
|
|
if ( |
|
outage_value["minor_alert_sent"] is False |
|
and len(outage_value["alerts"]) |
|
>= self.alerting_args.minor_outage_alert_threshold |
|
): |
|
msg = self._outage_alert_msg_factory( |
|
alert_type="Minor", |
|
key="Model", |
|
key_val=model, |
|
api_base=api_base, |
|
outage_value=outage_value, |
|
provider=provider, |
|
) |
|
|
|
await self.send_alert( |
|
message=msg, |
|
level="Medium", |
|
alert_type=AlertType.outage_alerts, |
|
alerting_metadata={}, |
|
) |
|
|
|
outage_value["minor_alert_sent"] = True |
|
elif ( |
|
outage_value["major_alert_sent"] is False |
|
and len(outage_value["alerts"]) |
|
>= self.alerting_args.major_outage_alert_threshold |
|
): |
|
msg = self._outage_alert_msg_factory( |
|
alert_type="Major", |
|
key="Model", |
|
key_val=model, |
|
api_base=api_base, |
|
outage_value=outage_value, |
|
provider=provider, |
|
) |
|
|
|
await self.send_alert( |
|
message=msg, |
|
level="High", |
|
alert_type=AlertType.outage_alerts, |
|
alerting_metadata={}, |
|
) |
|
|
|
outage_value["major_alert_sent"] = True |
|
|
|
|
|
await self.internal_usage_cache.async_set_cache( |
|
key=deployment_id, value=outage_value |
|
) |
|
except Exception: |
|
pass |
|
|
|
async def model_added_alert( |
|
self, model_name: str, litellm_model_name: str, passed_model_info: Any |
|
): |
|
base_model_from_user = getattr(passed_model_info, "base_model", None) |
|
model_info = {} |
|
base_model = "" |
|
if base_model_from_user is not None: |
|
model_info = litellm.model_cost.get(base_model_from_user, {}) |
|
base_model = f"Base Model: `{base_model_from_user}`\n" |
|
else: |
|
model_info = litellm.model_cost.get(litellm_model_name, {}) |
|
model_info_str = "" |
|
for k, v in model_info.items(): |
|
if k == "input_cost_per_token" or k == "output_cost_per_token": |
|
|
|
v = "{:.8f}".format(v) |
|
|
|
model_info_str += f"{k}: {v}\n" |
|
|
|
message = f""" |
|
*🚅 New Model Added* |
|
Model Name: `{model_name}` |
|
{base_model} |
|
|
|
Usage OpenAI Python SDK: |
|
``` |
|
import openai |
|
client = openai.OpenAI( |
|
api_key="your_api_key", |
|
base_url={os.getenv("PROXY_BASE_URL", "http://0.0.0.0:4000")} |
|
) |
|
|
|
response = client.chat.completions.create( |
|
model="{model_name}", # model to send to the proxy |
|
messages = [ |
|
{{ |
|
"role": "user", |
|
"content": "this is a test request, write a short poem" |
|
}} |
|
] |
|
) |
|
``` |
|
|
|
Model Info: |
|
``` |
|
{model_info_str} |
|
``` |
|
""" |
|
|
|
alert_val = self.send_alert( |
|
message=message, |
|
level="Low", |
|
alert_type=AlertType.new_model_added, |
|
alerting_metadata={}, |
|
) |
|
|
|
if alert_val is not None and asyncio.iscoroutine(alert_val): |
|
await alert_val |
|
|
|
async def model_removed_alert(self, model_name: str): |
|
pass |
|
|
|
async def send_webhook_alert(self, webhook_event: WebhookEvent) -> bool: |
|
""" |
|
Sends structured alert to webhook, if set. |
|
|
|
Currently only implemented for budget alerts |
|
|
|
Returns -> True if sent, False if not. |
|
|
|
Raises Exception |
|
- if WEBHOOK_URL is not set |
|
""" |
|
|
|
webhook_url = os.getenv("WEBHOOK_URL", None) |
|
if webhook_url is None: |
|
raise Exception("Missing webhook_url from environment") |
|
|
|
payload = webhook_event.model_dump_json() |
|
headers = {"Content-type": "application/json"} |
|
|
|
response = await self.async_http_handler.post( |
|
url=webhook_url, |
|
headers=headers, |
|
data=payload, |
|
) |
|
if response.status_code == 200: |
|
return True |
|
else: |
|
print("Error sending webhook alert. Error=", response.text) |
|
|
|
return False |
|
|
|
async def _check_if_using_premium_email_feature( |
|
self, |
|
premium_user: bool, |
|
email_logo_url: Optional[str] = None, |
|
email_support_contact: Optional[str] = None, |
|
): |
|
from litellm.proxy.proxy_server import CommonProxyErrors, premium_user |
|
|
|
if premium_user is not True: |
|
if email_logo_url is not None or email_support_contact is not None: |
|
raise ValueError( |
|
f"Trying to Customize Email Alerting\n {CommonProxyErrors.not_premium_user.value}" |
|
) |
|
return |
|
|
|
async def send_key_created_or_user_invited_email( |
|
self, webhook_event: WebhookEvent |
|
) -> bool: |
|
try: |
|
from litellm.proxy.utils import send_email |
|
|
|
if self.alerting is None or "email" not in self.alerting: |
|
|
|
verbose_proxy_logger.error( |
|
"Error sending email alert - 'email' not in self.alerting %s", |
|
self.alerting, |
|
) |
|
return False |
|
from litellm.proxy.proxy_server import premium_user, prisma_client |
|
|
|
email_logo_url = os.getenv( |
|
"SMTP_SENDER_LOGO", os.getenv("EMAIL_LOGO_URL", None) |
|
) |
|
email_support_contact = os.getenv("EMAIL_SUPPORT_CONTACT", None) |
|
await self._check_if_using_premium_email_feature( |
|
premium_user, email_logo_url, email_support_contact |
|
) |
|
if email_logo_url is None: |
|
email_logo_url = LITELLM_LOGO_URL |
|
if email_support_contact is None: |
|
email_support_contact = LITELLM_SUPPORT_CONTACT |
|
|
|
event_name = webhook_event.event_message |
|
recipient_email = webhook_event.user_email |
|
recipient_user_id = webhook_event.user_id |
|
if ( |
|
recipient_email is None |
|
and recipient_user_id is not None |
|
and prisma_client is not None |
|
): |
|
user_row = await prisma_client.db.litellm_usertable.find_unique( |
|
where={"user_id": recipient_user_id} |
|
) |
|
|
|
if user_row is not None: |
|
recipient_email = user_row.user_email |
|
|
|
key_token = webhook_event.token |
|
key_budget = webhook_event.max_budget |
|
base_url = os.getenv("PROXY_BASE_URL", "http://0.0.0.0:4000") |
|
|
|
email_html_content = "Alert from LiteLLM Server" |
|
if recipient_email is None: |
|
verbose_proxy_logger.error( |
|
"Trying to send email alert to no recipient", |
|
extra=webhook_event.dict(), |
|
) |
|
|
|
if webhook_event.event == "key_created": |
|
email_html_content = KEY_CREATED_EMAIL_TEMPLATE.format( |
|
email_logo_url=email_logo_url, |
|
recipient_email=recipient_email, |
|
key_budget=key_budget, |
|
key_token=key_token, |
|
base_url=base_url, |
|
email_support_contact=email_support_contact, |
|
) |
|
elif webhook_event.event == "internal_user_created": |
|
|
|
team_id = webhook_event.team_id |
|
team_name = "Default Team" |
|
if team_id is not None and prisma_client is not None: |
|
team_row = await prisma_client.db.litellm_teamtable.find_unique( |
|
where={"team_id": team_id} |
|
) |
|
if team_row is not None: |
|
team_name = team_row.team_alias or "-" |
|
email_html_content = USER_INVITED_EMAIL_TEMPLATE.format( |
|
email_logo_url=email_logo_url, |
|
recipient_email=recipient_email, |
|
team_name=team_name, |
|
base_url=base_url, |
|
email_support_contact=email_support_contact, |
|
) |
|
else: |
|
verbose_proxy_logger.error( |
|
"Trying to send email alert on unknown webhook event", |
|
extra=webhook_event.model_dump(), |
|
) |
|
|
|
webhook_event.model_dump_json() |
|
email_event = { |
|
"to": recipient_email, |
|
"subject": f"LiteLLM: {event_name}", |
|
"html": email_html_content, |
|
} |
|
|
|
await send_email( |
|
receiver_email=email_event["to"], |
|
subject=email_event["subject"], |
|
html=email_event["html"], |
|
) |
|
|
|
return True |
|
|
|
except Exception as e: |
|
verbose_proxy_logger.error("Error sending email alert %s", str(e)) |
|
return False |
|
|
|
async def send_email_alert_using_smtp( |
|
self, webhook_event: WebhookEvent, alert_type: str |
|
) -> bool: |
|
""" |
|
Sends structured Email alert to an SMTP server |
|
|
|
Currently only implemented for budget alerts |
|
|
|
Returns -> True if sent, False if not. |
|
""" |
|
from litellm.proxy.proxy_server import premium_user |
|
from litellm.proxy.utils import send_email |
|
|
|
email_logo_url = os.getenv( |
|
"SMTP_SENDER_LOGO", os.getenv("EMAIL_LOGO_URL", None) |
|
) |
|
email_support_contact = os.getenv("EMAIL_SUPPORT_CONTACT", None) |
|
await self._check_if_using_premium_email_feature( |
|
premium_user, email_logo_url, email_support_contact |
|
) |
|
|
|
if email_logo_url is None: |
|
email_logo_url = LITELLM_LOGO_URL |
|
if email_support_contact is None: |
|
email_support_contact = LITELLM_SUPPORT_CONTACT |
|
|
|
event_name = webhook_event.event_message |
|
recipient_email = webhook_event.user_email |
|
user_name = webhook_event.user_id |
|
max_budget = webhook_event.max_budget |
|
email_html_content = "Alert from LiteLLM Server" |
|
if recipient_email is None: |
|
verbose_proxy_logger.error( |
|
"Trying to send email alert to no recipient", extra=webhook_event.dict() |
|
) |
|
|
|
if webhook_event.event == "budget_crossed": |
|
email_html_content = f""" |
|
<img src="{email_logo_url}" alt="LiteLLM Logo" width="150" height="50" /> |
|
|
|
<p> Hi {user_name}, <br/> |
|
|
|
Your LLM API usage this month has reached your account's <b> monthly budget of ${max_budget} </b> <br /> <br /> |
|
|
|
API requests will be rejected until either (a) you increase your monthly budget or (b) your monthly usage resets at the beginning of the next calendar month. <br /> <br /> |
|
|
|
If you have any questions, please send an email to {email_support_contact} <br /> <br /> |
|
|
|
Best, <br /> |
|
The LiteLLM team <br /> |
|
""" |
|
|
|
webhook_event.model_dump_json() |
|
email_event = { |
|
"to": recipient_email, |
|
"subject": f"LiteLLM: {event_name}", |
|
"html": email_html_content, |
|
} |
|
|
|
await send_email( |
|
receiver_email=email_event["to"], |
|
subject=email_event["subject"], |
|
html=email_event["html"], |
|
) |
|
if webhook_event.event_group == "team": |
|
from litellm.integrations.email_alerting import send_team_budget_alert |
|
|
|
await send_team_budget_alert(webhook_event=webhook_event) |
|
|
|
return False |
|
|
|
async def send_alert( |
|
self, |
|
message: str, |
|
level: Literal["Low", "Medium", "High"], |
|
alert_type: AlertType, |
|
alerting_metadata: dict, |
|
user_info: Optional[WebhookEvent] = None, |
|
**kwargs, |
|
): |
|
""" |
|
Alerting based on thresholds: - https://github.com/BerriAI/litellm/issues/1298 |
|
|
|
- Responses taking too long |
|
- Requests are hanging |
|
- Calls are failing |
|
- DB Read/Writes are failing |
|
- Proxy Close to max budget |
|
- Key Close to max budget |
|
|
|
Parameters: |
|
level: str - Low|Medium|High - if calls might fail (Medium) or are failing (High); Currently, no alerts would be 'Low'. |
|
message: str - what is the alert about |
|
""" |
|
if self.alerting is None: |
|
return |
|
|
|
if ( |
|
"webhook" in self.alerting |
|
and alert_type == "budget_alerts" |
|
and user_info is not None |
|
): |
|
await self.send_webhook_alert(webhook_event=user_info) |
|
|
|
if ( |
|
"email" in self.alerting |
|
and alert_type == "budget_alerts" |
|
and user_info is not None |
|
): |
|
|
|
await self.send_email_alert_using_smtp( |
|
webhook_event=user_info, alert_type=alert_type |
|
) |
|
|
|
if "slack" not in self.alerting: |
|
return |
|
if alert_type not in self.alert_types: |
|
return |
|
|
|
from datetime import datetime |
|
|
|
|
|
current_time = datetime.now().strftime("%H:%M:%S") |
|
_proxy_base_url = os.getenv("PROXY_BASE_URL", None) |
|
if alert_type == "daily_reports" or alert_type == "new_model_added": |
|
formatted_message = message |
|
else: |
|
formatted_message = ( |
|
f"Level: `{level}`\nTimestamp: `{current_time}`\n\nMessage: {message}" |
|
) |
|
|
|
if kwargs: |
|
for key, value in kwargs.items(): |
|
formatted_message += f"\n\n{key}: `{value}`\n\n" |
|
if alerting_metadata: |
|
for key, value in alerting_metadata.items(): |
|
formatted_message += f"\n\n*Alerting Metadata*: \n{key}: `{value}`\n\n" |
|
if _proxy_base_url is not None: |
|
formatted_message += f"\n\nProxy URL: `{_proxy_base_url}`" |
|
|
|
|
|
if ( |
|
self.alert_to_webhook_url is not None |
|
and alert_type in self.alert_to_webhook_url |
|
): |
|
slack_webhook_url: Optional[Union[str, List[str]]] = ( |
|
self.alert_to_webhook_url[alert_type] |
|
) |
|
elif self.default_webhook_url is not None: |
|
slack_webhook_url = self.default_webhook_url |
|
else: |
|
slack_webhook_url = os.getenv("SLACK_WEBHOOK_URL", None) |
|
|
|
if slack_webhook_url is None: |
|
raise ValueError("Missing SLACK_WEBHOOK_URL from environment") |
|
payload = {"text": formatted_message} |
|
headers = {"Content-type": "application/json"} |
|
|
|
if isinstance(slack_webhook_url, list): |
|
for url in slack_webhook_url: |
|
self.log_queue.append( |
|
{ |
|
"url": url, |
|
"headers": headers, |
|
"payload": payload, |
|
"alert_type": alert_type, |
|
} |
|
) |
|
else: |
|
self.log_queue.append( |
|
{ |
|
"url": slack_webhook_url, |
|
"headers": headers, |
|
"payload": payload, |
|
"alert_type": alert_type, |
|
} |
|
) |
|
|
|
if len(self.log_queue) >= self.batch_size: |
|
await self.flush_queue() |
|
|
|
async def async_send_batch(self): |
|
if not self.log_queue: |
|
return |
|
|
|
squashed_queue = squash_payloads(self.log_queue) |
|
tasks = [ |
|
send_to_webhook( |
|
slackAlertingInstance=self, item=item["item"], count=item["count"] |
|
) |
|
for item in squashed_queue.values() |
|
] |
|
await asyncio.gather(*tasks) |
|
self.log_queue.clear() |
|
|
|
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): |
|
"""Log deployment latency""" |
|
try: |
|
if "daily_reports" in self.alert_types: |
|
litellm_params = kwargs.get("litellm_params", {}) or {} |
|
model_info = litellm_params.get("model_info", {}) or {} |
|
model_id = model_info.get("id", "") or "" |
|
response_s: timedelta = end_time - start_time |
|
|
|
final_value = response_s |
|
|
|
if isinstance(response_obj, litellm.ModelResponse) and ( |
|
hasattr(response_obj, "usage") |
|
and response_obj.usage is not None |
|
and hasattr(response_obj.usage, "completion_tokens") |
|
): |
|
completion_tokens = response_obj.usage.completion_tokens |
|
if completion_tokens is not None and completion_tokens > 0: |
|
final_value = float( |
|
response_s.total_seconds() / completion_tokens |
|
) |
|
if isinstance(final_value, timedelta): |
|
final_value = final_value.total_seconds() |
|
|
|
await self.async_update_daily_reports( |
|
DeploymentMetrics( |
|
id=model_id, |
|
failed_request=False, |
|
latency_per_output_token=final_value, |
|
updated_at=litellm.utils.get_utc_datetime(), |
|
) |
|
) |
|
except Exception as e: |
|
verbose_proxy_logger.error( |
|
f"[Non-Blocking Error] Slack Alerting: Got error in logging LLM deployment latency: {str(e)}" |
|
) |
|
pass |
|
|
|
async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time): |
|
"""Log failure + deployment latency""" |
|
_litellm_params = kwargs.get("litellm_params", {}) |
|
_model_info = _litellm_params.get("model_info", {}) or {} |
|
model_id = _model_info.get("id", "") |
|
try: |
|
if "daily_reports" in self.alert_types: |
|
try: |
|
await self.async_update_daily_reports( |
|
DeploymentMetrics( |
|
id=model_id, |
|
failed_request=True, |
|
latency_per_output_token=None, |
|
updated_at=litellm.utils.get_utc_datetime(), |
|
) |
|
) |
|
except Exception as e: |
|
verbose_logger.debug(f"Exception raises -{str(e)}") |
|
|
|
if isinstance(kwargs.get("exception", ""), APIError): |
|
if "outage_alerts" in self.alert_types: |
|
await self.outage_alerts( |
|
exception=kwargs["exception"], |
|
deployment_id=model_id, |
|
) |
|
|
|
if "region_outage_alerts" in self.alert_types: |
|
await self.region_outage_alerts( |
|
exception=kwargs["exception"], deployment_id=model_id |
|
) |
|
except Exception: |
|
pass |
|
|
|
async def _run_scheduler_helper(self, llm_router) -> bool: |
|
""" |
|
Returns: |
|
- True -> report sent |
|
- False -> report not sent |
|
""" |
|
report_sent_bool = False |
|
|
|
report_sent = await self.internal_usage_cache.async_get_cache( |
|
key=SlackAlertingCacheKeys.report_sent_key.value, |
|
parent_otel_span=None, |
|
) |
|
|
|
current_time = time.time() |
|
|
|
if report_sent is None: |
|
await self.internal_usage_cache.async_set_cache( |
|
key=SlackAlertingCacheKeys.report_sent_key.value, |
|
value=current_time, |
|
) |
|
elif isinstance(report_sent, float): |
|
|
|
interval_seconds = self.alerting_args.daily_report_frequency |
|
|
|
if current_time - report_sent >= interval_seconds: |
|
|
|
await self.send_daily_reports(router=llm_router) |
|
|
|
await self.internal_usage_cache.async_set_cache( |
|
key=SlackAlertingCacheKeys.report_sent_key.value, |
|
value=current_time, |
|
) |
|
report_sent_bool = True |
|
|
|
return report_sent_bool |
|
|
|
async def _run_scheduled_daily_report(self, llm_router: Optional[Any] = None): |
|
""" |
|
If 'daily_reports' enabled |
|
|
|
Ping redis cache every 5 minutes to check if we should send the report |
|
|
|
If yes -> call send_daily_report() |
|
""" |
|
if llm_router is None or self.alert_types is None: |
|
return |
|
|
|
if "daily_reports" in self.alert_types: |
|
while True: |
|
await self._run_scheduler_helper(llm_router=llm_router) |
|
interval = random.randint( |
|
self.alerting_args.report_check_interval - 3, |
|
self.alerting_args.report_check_interval + 3, |
|
) |
|
await asyncio.sleep(interval) |
|
return |
|
|
|
async def send_weekly_spend_report( |
|
self, |
|
time_range: str = "7d", |
|
): |
|
""" |
|
Send a spend report for a configurable time range. |
|
|
|
Args: |
|
time_range: A string specifying the time range for the report, e.g., "1d", "7d", "30d" |
|
""" |
|
if self.alerting is None or "spend_reports" not in self.alert_types: |
|
return |
|
|
|
try: |
|
from litellm.proxy.spend_tracking.spend_management_endpoints import ( |
|
_get_spend_report_for_time_range, |
|
) |
|
|
|
|
|
days = int(time_range[:-1]) |
|
if time_range[-1].lower() != "d": |
|
raise ValueError("Time range must be specified in days, e.g., '7d'") |
|
|
|
todays_date = datetime.datetime.now().date() |
|
start_date = todays_date - datetime.timedelta(days=days) |
|
|
|
_event_cache_key = f"weekly_spend_report_sent_{start_date.strftime('%Y-%m-%d')}_{todays_date.strftime('%Y-%m-%d')}" |
|
if await self.internal_usage_cache.async_get_cache(key=_event_cache_key): |
|
return |
|
|
|
_resp = await _get_spend_report_for_time_range( |
|
start_date=start_date.strftime("%Y-%m-%d"), |
|
end_date=todays_date.strftime("%Y-%m-%d"), |
|
) |
|
if _resp is None or _resp == ([], []): |
|
return |
|
|
|
spend_per_team, spend_per_tag = _resp |
|
|
|
_spend_message = f"*💸 Spend Report for `{start_date.strftime('%m-%d-%Y')} - {todays_date.strftime('%m-%d-%Y')}` ({days} days)*\n" |
|
|
|
if spend_per_team is not None: |
|
_spend_message += "\n*Team Spend Report:*\n" |
|
for spend in spend_per_team: |
|
_team_spend = round(float(spend["total_spend"]), 4) |
|
_spend_message += ( |
|
f"Team: `{spend['team_alias']}` | Spend: `${_team_spend}`\n" |
|
) |
|
|
|
if spend_per_tag is not None: |
|
_spend_message += "\n*Tag Spend Report:*\n" |
|
for spend in spend_per_tag: |
|
_tag_spend = round(float(spend["total_spend"]), 4) |
|
_spend_message += f"Tag: `{spend['individual_request_tag']}` | Spend: `${_tag_spend}`\n" |
|
|
|
await self.send_alert( |
|
message=_spend_message, |
|
level="Low", |
|
alert_type=AlertType.spend_reports, |
|
alerting_metadata={}, |
|
) |
|
|
|
await self.internal_usage_cache.async_set_cache( |
|
key=_event_cache_key, |
|
value="SENT", |
|
ttl=duration_in_seconds(time_range), |
|
) |
|
|
|
except ValueError as ve: |
|
verbose_proxy_logger.error(f"Invalid time range format: {ve}") |
|
except Exception as e: |
|
verbose_proxy_logger.error(f"Error sending spend report: {e}") |
|
|
|
async def send_monthly_spend_report(self): |
|
""" """ |
|
try: |
|
from calendar import monthrange |
|
|
|
from litellm.proxy.spend_tracking.spend_management_endpoints import ( |
|
_get_spend_report_for_time_range, |
|
) |
|
|
|
todays_date = datetime.datetime.now().date() |
|
first_day_of_month = todays_date.replace(day=1) |
|
_, last_day_of_month = monthrange(todays_date.year, todays_date.month) |
|
last_day_of_month = first_day_of_month + datetime.timedelta( |
|
days=last_day_of_month - 1 |
|
) |
|
|
|
_event_cache_key = f"monthly_spend_report_sent_{first_day_of_month.strftime('%Y-%m-%d')}_{last_day_of_month.strftime('%Y-%m-%d')}" |
|
if await self.internal_usage_cache.async_get_cache(key=_event_cache_key): |
|
return |
|
|
|
_resp = await _get_spend_report_for_time_range( |
|
start_date=first_day_of_month.strftime("%Y-%m-%d"), |
|
end_date=last_day_of_month.strftime("%Y-%m-%d"), |
|
) |
|
|
|
if _resp is None or _resp == ([], []): |
|
return |
|
|
|
monthly_spend_per_team, monthly_spend_per_tag = _resp |
|
|
|
_spend_message = f"*💸 Monthly Spend Report for `{first_day_of_month.strftime('%m-%d-%Y')} - {last_day_of_month.strftime('%m-%d-%Y')}` *\n" |
|
|
|
if monthly_spend_per_team is not None: |
|
_spend_message += "\n*Team Spend Report:*\n" |
|
for spend in monthly_spend_per_team: |
|
_team_spend = spend["total_spend"] |
|
_team_spend = float(_team_spend) |
|
|
|
_team_spend = round(_team_spend, 4) |
|
_spend_message += ( |
|
f"Team: `{spend['team_alias']}` | Spend: `${_team_spend}`\n" |
|
) |
|
|
|
if monthly_spend_per_tag is not None: |
|
_spend_message += "\n*Tag Spend Report:*\n" |
|
for spend in monthly_spend_per_tag: |
|
_tag_spend = spend["total_spend"] |
|
_tag_spend = float(_tag_spend) |
|
|
|
_tag_spend = round(_tag_spend, 4) |
|
_spend_message += f"Tag: `{spend['individual_request_tag']}` | Spend: `${_tag_spend}`\n" |
|
|
|
await self.send_alert( |
|
message=_spend_message, |
|
level="Low", |
|
alert_type=AlertType.spend_reports, |
|
alerting_metadata={}, |
|
) |
|
|
|
await self.internal_usage_cache.async_set_cache( |
|
key=_event_cache_key, |
|
value="SENT", |
|
ttl=(30 * 24 * 60 * 60), |
|
) |
|
|
|
except Exception as e: |
|
verbose_proxy_logger.exception("Error sending weekly spend report %s", e) |
|
|
|
async def send_fallback_stats_from_prometheus(self): |
|
""" |
|
Helper to send fallback statistics from prometheus server -> to slack |
|
|
|
This runs once per day and sends an overview of all the fallback statistics |
|
""" |
|
try: |
|
from litellm.integrations.prometheus_helpers.prometheus_api import ( |
|
get_fallback_metric_from_prometheus, |
|
) |
|
|
|
|
|
falllback_success_info_prometheus = ( |
|
await get_fallback_metric_from_prometheus() |
|
) |
|
|
|
fallback_message = ( |
|
f"*Fallback Statistics:*\n{falllback_success_info_prometheus}" |
|
) |
|
|
|
await self.send_alert( |
|
message=fallback_message, |
|
level="Low", |
|
alert_type=AlertType.fallback_reports, |
|
alerting_metadata={}, |
|
) |
|
|
|
except Exception as e: |
|
verbose_proxy_logger.error("Error sending weekly spend report %s", e) |
|
|
|
pass |
|
|
|
async def send_virtual_key_event_slack( |
|
self, |
|
key_event: VirtualKeyEvent, |
|
alert_type: AlertType, |
|
event_name: str, |
|
): |
|
""" |
|
Handles sending Virtual Key related alerts |
|
|
|
Example: |
|
- New Virtual Key Created |
|
- Internal User Updated |
|
- Team Created, Updated, Deleted |
|
""" |
|
try: |
|
|
|
message = f"`{event_name}`\n" |
|
|
|
key_event_dict = key_event.model_dump() |
|
|
|
|
|
message += "*Action Done by:*\n" |
|
for key, value in key_event_dict.items(): |
|
if "created_by" in key: |
|
message += f"{key}: `{value}`\n" |
|
|
|
|
|
message += "\n*Arguments passed:*\n" |
|
request_kwargs = key_event.request_kwargs |
|
for key, value in request_kwargs.items(): |
|
if key == "user_api_key_dict": |
|
continue |
|
message += f"{key}: `{value}`\n" |
|
|
|
await self.send_alert( |
|
message=message, |
|
level="High", |
|
alert_type=alert_type, |
|
alerting_metadata={}, |
|
) |
|
|
|
except Exception as e: |
|
verbose_proxy_logger.error( |
|
"Error sending send_virtual_key_event_slack %s", e |
|
) |
|
|
|
return |
|
|
|
async def _request_is_completed(self, request_data: Optional[dict]) -> bool: |
|
""" |
|
Returns True if the request is completed - either as a success or failure |
|
""" |
|
if request_data is None: |
|
return False |
|
|
|
if ( |
|
request_data.get("litellm_status", "") != "success" |
|
and request_data.get("litellm_status", "") != "fail" |
|
): |
|
|
|
litellm_call_id = request_data.get("litellm_call_id", "") |
|
status: Optional[str] = await self.internal_usage_cache.async_get_cache( |
|
key="request_status:{}".format(litellm_call_id), local_only=True |
|
) |
|
if status is not None and (status == "success" or status == "fail"): |
|
return True |
|
return False |
|
|