TestLLM / litellm /proxy /vertex_ai_endpoints /langfuse_endpoints.py
Raju2024's picture
Upload 1072 files
e3278e4 verified
raw
history blame
4.43 kB
"""
What is this?
Logging Pass-Through Endpoints
"""
"""
1. Create pass-through endpoints for any LITELLM_BASE_URL/langfuse/<endpoint> map to LANGFUSE_BASE_URL/<endpoint>
"""
import base64
import os
from base64 import b64encode
from typing import Optional
import httpx
from fastapi import APIRouter, Request, Response
import litellm
from litellm.proxy._types import *
from litellm.proxy.auth.user_api_key_auth import user_api_key_auth
from litellm.proxy.litellm_pre_call_utils import _get_dynamic_logging_metadata
from litellm.proxy.pass_through_endpoints.pass_through_endpoints import (
create_pass_through_route,
)
router = APIRouter()
default_vertex_config = None
def create_request_copy(request: Request):
return {
"method": request.method,
"url": str(request.url),
"headers": dict(request.headers),
"cookies": request.cookies,
"query_params": dict(request.query_params),
}
@router.api_route(
"/langfuse/{endpoint:path}",
methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
tags=["Langfuse Pass-through", "pass-through"],
)
async def langfuse_proxy_route(
endpoint: str,
request: Request,
fastapi_response: Response,
):
"""
Call Langfuse via LiteLLM proxy. Works with Langfuse SDK.
[Docs](https://docs.litellm.ai/docs/pass_through/langfuse)
"""
from litellm.proxy.proxy_server import proxy_config
## CHECK FOR LITELLM API KEY IN THE QUERY PARAMS - ?..key=LITELLM_API_KEY
api_key = request.headers.get("Authorization") or ""
## decrypt base64 hash
api_key = api_key.replace("Basic ", "")
decoded_bytes = base64.b64decode(api_key)
decoded_str = decoded_bytes.decode("utf-8")
api_key = decoded_str.split(":")[1] # assume api key is passed in as secret key
user_api_key_dict = await user_api_key_auth(
request=request, api_key="Bearer {}".format(api_key)
)
callback_settings_obj: Optional[TeamCallbackMetadata] = (
_get_dynamic_logging_metadata(
user_api_key_dict=user_api_key_dict, proxy_config=proxy_config
)
)
dynamic_langfuse_public_key: Optional[str] = None
dynamic_langfuse_secret_key: Optional[str] = None
dynamic_langfuse_host: Optional[str] = None
if (
callback_settings_obj is not None
and callback_settings_obj.callback_vars is not None
):
for k, v in callback_settings_obj.callback_vars.items():
if k == "langfuse_public_key":
dynamic_langfuse_public_key = v
elif k == "langfuse_secret_key":
dynamic_langfuse_secret_key = v
elif k == "langfuse_host":
dynamic_langfuse_host = v
base_target_url: str = (
dynamic_langfuse_host
or os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com")
or "https://cloud.langfuse.com"
)
if not (
base_target_url.startswith("http://") or base_target_url.startswith("https://")
):
# add http:// if unset, assume communicating over private network - e.g. render
base_target_url = "http://" + base_target_url
encoded_endpoint = httpx.URL(endpoint).path
# Ensure endpoint starts with '/' for proper URL construction
if not encoded_endpoint.startswith("/"):
encoded_endpoint = "/" + encoded_endpoint
# Construct the full target URL using httpx
base_url = httpx.URL(base_target_url)
updated_url = base_url.copy_with(path=encoded_endpoint)
# Add or update query parameters
langfuse_public_key = dynamic_langfuse_public_key or litellm.utils.get_secret(
secret_name="LANGFUSE_PUBLIC_KEY"
)
langfuse_secret_key = dynamic_langfuse_secret_key or litellm.utils.get_secret(
secret_name="LANGFUSE_SECRET_KEY"
)
langfuse_combined_key = "Basic " + b64encode(
f"{langfuse_public_key}:{langfuse_secret_key}".encode("utf-8")
).decode("ascii")
## CREATE PASS-THROUGH
endpoint_func = create_pass_through_route(
endpoint=endpoint,
target=str(updated_url),
custom_headers={"Authorization": langfuse_combined_key},
) # dynamically construct pass-through endpoint based on incoming path
received_value = await endpoint_func(
request,
fastapi_response,
user_api_key_dict,
query_params=dict(request.query_params), # type: ignore
)
return received_value