|
""" |
|
Utils used for slack alerting |
|
""" |
|
|
|
import asyncio |
|
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union |
|
|
|
from litellm.proxy._types import AlertType |
|
from litellm.secret_managers.main import get_secret |
|
|
|
if TYPE_CHECKING: |
|
from litellm.litellm_core_utils.litellm_logging import Logging as _Logging |
|
|
|
Logging = _Logging |
|
else: |
|
Logging = Any |
|
|
|
|
|
def process_slack_alerting_variables( |
|
alert_to_webhook_url: Optional[Dict[AlertType, Union[List[str], str]]] |
|
) -> Optional[Dict[AlertType, Union[List[str], str]]]: |
|
""" |
|
process alert_to_webhook_url |
|
- check if any urls are set as os.environ/SLACK_WEBHOOK_URL_1 read env var and set the correct value |
|
""" |
|
if alert_to_webhook_url is None: |
|
return None |
|
|
|
for alert_type, webhook_urls in alert_to_webhook_url.items(): |
|
if isinstance(webhook_urls, list): |
|
_webhook_values: List[str] = [] |
|
for webhook_url in webhook_urls: |
|
if "os.environ/" in webhook_url: |
|
_env_value = get_secret(secret_name=webhook_url) |
|
if not isinstance(_env_value, str): |
|
raise ValueError( |
|
f"Invalid webhook url value for: {webhook_url}. Got type={type(_env_value)}" |
|
) |
|
_webhook_values.append(_env_value) |
|
else: |
|
_webhook_values.append(webhook_url) |
|
|
|
alert_to_webhook_url[alert_type] = _webhook_values |
|
else: |
|
_webhook_value_str: str = webhook_urls |
|
if "os.environ/" in webhook_urls: |
|
_env_value = get_secret(secret_name=webhook_urls) |
|
if not isinstance(_env_value, str): |
|
raise ValueError( |
|
f"Invalid webhook url value for: {webhook_urls}. Got type={type(_env_value)}" |
|
) |
|
_webhook_value_str = _env_value |
|
else: |
|
_webhook_value_str = webhook_urls |
|
|
|
alert_to_webhook_url[alert_type] = _webhook_value_str |
|
|
|
return alert_to_webhook_url |
|
|
|
|
|
async def _add_langfuse_trace_id_to_alert( |
|
request_data: Optional[dict] = None, |
|
) -> Optional[str]: |
|
""" |
|
Returns langfuse trace url |
|
|
|
- check: |
|
-> existing_trace_id |
|
-> trace_id |
|
-> litellm_call_id |
|
""" |
|
|
|
if ( |
|
request_data is not None |
|
and request_data.get("litellm_logging_obj", None) is not None |
|
): |
|
trace_id: Optional[str] = None |
|
litellm_logging_obj: Logging = request_data["litellm_logging_obj"] |
|
|
|
for _ in range(3): |
|
trace_id = litellm_logging_obj._get_trace_id(service_name="langfuse") |
|
if trace_id is not None: |
|
break |
|
await asyncio.sleep(3) |
|
|
|
_langfuse_object = litellm_logging_obj._get_callback_object( |
|
service_name="langfuse" |
|
) |
|
if _langfuse_object is not None: |
|
base_url = _langfuse_object.Langfuse.base_url |
|
return f"{base_url}/trace/{trace_id}" |
|
return None |
|
|