File size: 4,427 Bytes
e3278e4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
"""
What is this? 

Logging Pass-Through Endpoints
"""

"""
1. Create pass-through endpoints for any LITELLM_BASE_URL/langfuse/<endpoint> map to LANGFUSE_BASE_URL/<endpoint>
"""

import base64
import os
from base64 import b64encode
from typing import Optional

import httpx
from fastapi import APIRouter, Request, Response

import litellm
from litellm.proxy._types import *
from litellm.proxy.auth.user_api_key_auth import user_api_key_auth
from litellm.proxy.litellm_pre_call_utils import _get_dynamic_logging_metadata
from litellm.proxy.pass_through_endpoints.pass_through_endpoints import (
    create_pass_through_route,
)

router = APIRouter()
default_vertex_config = None


def create_request_copy(request: Request):
    return {
        "method": request.method,
        "url": str(request.url),
        "headers": dict(request.headers),
        "cookies": request.cookies,
        "query_params": dict(request.query_params),
    }


@router.api_route(
    "/langfuse/{endpoint:path}",
    methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
    tags=["Langfuse Pass-through", "pass-through"],
)
async def langfuse_proxy_route(
    endpoint: str,
    request: Request,
    fastapi_response: Response,
):
    """
    Call Langfuse via LiteLLM proxy. Works with Langfuse SDK.

    [Docs](https://docs.litellm.ai/docs/pass_through/langfuse)
    """
    from litellm.proxy.proxy_server import proxy_config

    ## CHECK FOR LITELLM API KEY IN THE QUERY PARAMS - ?..key=LITELLM_API_KEY
    api_key = request.headers.get("Authorization") or ""

    ## decrypt base64 hash
    api_key = api_key.replace("Basic ", "")

    decoded_bytes = base64.b64decode(api_key)
    decoded_str = decoded_bytes.decode("utf-8")
    api_key = decoded_str.split(":")[1]  # assume api key is passed in as secret key

    user_api_key_dict = await user_api_key_auth(
        request=request, api_key="Bearer {}".format(api_key)
    )

    callback_settings_obj: Optional[TeamCallbackMetadata] = (
        _get_dynamic_logging_metadata(
            user_api_key_dict=user_api_key_dict, proxy_config=proxy_config
        )
    )

    dynamic_langfuse_public_key: Optional[str] = None
    dynamic_langfuse_secret_key: Optional[str] = None
    dynamic_langfuse_host: Optional[str] = None
    if (
        callback_settings_obj is not None
        and callback_settings_obj.callback_vars is not None
    ):
        for k, v in callback_settings_obj.callback_vars.items():
            if k == "langfuse_public_key":
                dynamic_langfuse_public_key = v
            elif k == "langfuse_secret_key":
                dynamic_langfuse_secret_key = v
            elif k == "langfuse_host":
                dynamic_langfuse_host = v

    base_target_url: str = (
        dynamic_langfuse_host
        or os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com")
        or "https://cloud.langfuse.com"
    )
    if not (
        base_target_url.startswith("http://") or base_target_url.startswith("https://")
    ):
        # add http:// if unset, assume communicating over private network - e.g. render
        base_target_url = "http://" + base_target_url

    encoded_endpoint = httpx.URL(endpoint).path

    # Ensure endpoint starts with '/' for proper URL construction
    if not encoded_endpoint.startswith("/"):
        encoded_endpoint = "/" + encoded_endpoint

    # Construct the full target URL using httpx
    base_url = httpx.URL(base_target_url)
    updated_url = base_url.copy_with(path=encoded_endpoint)

    # Add or update query parameters
    langfuse_public_key = dynamic_langfuse_public_key or litellm.utils.get_secret(
        secret_name="LANGFUSE_PUBLIC_KEY"
    )
    langfuse_secret_key = dynamic_langfuse_secret_key or litellm.utils.get_secret(
        secret_name="LANGFUSE_SECRET_KEY"
    )

    langfuse_combined_key = "Basic " + b64encode(
        f"{langfuse_public_key}:{langfuse_secret_key}".encode("utf-8")
    ).decode("ascii")

    ## CREATE PASS-THROUGH
    endpoint_func = create_pass_through_route(
        endpoint=endpoint,
        target=str(updated_url),
        custom_headers={"Authorization": langfuse_combined_key},
    )  # dynamically construct pass-through endpoint based on incoming path
    received_value = await endpoint_func(
        request,
        fastapi_response,
        user_api_key_dict,
        query_params=dict(request.query_params),  # type: ignore
    )

    return received_value