File size: 3,108 Bytes
e3278e4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
"""
Utils used for slack alerting
"""

import asyncio
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union

from litellm.proxy._types import AlertType
from litellm.secret_managers.main import get_secret

if TYPE_CHECKING:
    from litellm.litellm_core_utils.litellm_logging import Logging as _Logging

    Logging = _Logging
else:
    Logging = Any


def process_slack_alerting_variables(
    alert_to_webhook_url: Optional[Dict[AlertType, Union[List[str], str]]]
) -> Optional[Dict[AlertType, Union[List[str], str]]]:
    """
    process alert_to_webhook_url
    - check if any urls are set as os.environ/SLACK_WEBHOOK_URL_1 read env var and set the correct value
    """
    if alert_to_webhook_url is None:
        return None

    for alert_type, webhook_urls in alert_to_webhook_url.items():
        if isinstance(webhook_urls, list):
            _webhook_values: List[str] = []
            for webhook_url in webhook_urls:
                if "os.environ/" in webhook_url:
                    _env_value = get_secret(secret_name=webhook_url)
                    if not isinstance(_env_value, str):
                        raise ValueError(
                            f"Invalid webhook url value for: {webhook_url}. Got type={type(_env_value)}"
                        )
                    _webhook_values.append(_env_value)
                else:
                    _webhook_values.append(webhook_url)

            alert_to_webhook_url[alert_type] = _webhook_values
        else:
            _webhook_value_str: str = webhook_urls
            if "os.environ/" in webhook_urls:
                _env_value = get_secret(secret_name=webhook_urls)
                if not isinstance(_env_value, str):
                    raise ValueError(
                        f"Invalid webhook url value for: {webhook_urls}. Got type={type(_env_value)}"
                    )
                _webhook_value_str = _env_value
            else:
                _webhook_value_str = webhook_urls

            alert_to_webhook_url[alert_type] = _webhook_value_str

    return alert_to_webhook_url


async def _add_langfuse_trace_id_to_alert(
    request_data: Optional[dict] = None,
) -> Optional[str]:
    """
    Returns langfuse trace url

    - check:
    -> existing_trace_id
    -> trace_id
    -> litellm_call_id
    """
    # do nothing for now
    if (
        request_data is not None
        and request_data.get("litellm_logging_obj", None) is not None
    ):
        trace_id: Optional[str] = None
        litellm_logging_obj: Logging = request_data["litellm_logging_obj"]

        for _ in range(3):
            trace_id = litellm_logging_obj._get_trace_id(service_name="langfuse")
            if trace_id is not None:
                break
            await asyncio.sleep(3)  # wait 3s before retrying for trace id

        _langfuse_object = litellm_logging_obj._get_callback_object(
            service_name="langfuse"
        )
        if _langfuse_object is not None:
            base_url = _langfuse_object.Langfuse.base_url
            return f"{base_url}/trace/{trace_id}"
    return None