|
""" |
|
What is this? |
|
|
|
Provider-specific Pass-Through Endpoints |
|
|
|
Use litellm with Anthropic SDK, Vertex AI SDK, Cohere SDK, etc. |
|
""" |
|
|
|
from typing import Optional |
|
|
|
import httpx |
|
from fastapi import APIRouter, Depends, HTTPException, Request, Response |
|
|
|
import litellm |
|
from litellm.constants import BEDROCK_AGENT_RUNTIME_PASS_THROUGH_ROUTES |
|
from litellm.proxy._types import * |
|
from litellm.proxy.auth.user_api_key_auth import user_api_key_auth |
|
from litellm.proxy.pass_through_endpoints.pass_through_endpoints import ( |
|
create_pass_through_route, |
|
) |
|
from litellm.secret_managers.main import get_secret_str |
|
|
|
router = APIRouter() |
|
default_vertex_config = None |
|
|
|
|
|
def create_request_copy(request: Request): |
|
return { |
|
"method": request.method, |
|
"url": str(request.url), |
|
"headers": dict(request.headers), |
|
"cookies": request.cookies, |
|
"query_params": dict(request.query_params), |
|
} |
|
|
|
|
|
@router.api_route( |
|
"/gemini/{endpoint:path}", |
|
methods=["GET", "POST", "PUT", "DELETE", "PATCH"], |
|
tags=["Google AI Studio Pass-through", "pass-through"], |
|
) |
|
async def gemini_proxy_route( |
|
endpoint: str, |
|
request: Request, |
|
fastapi_response: Response, |
|
): |
|
""" |
|
[Docs](https://docs.litellm.ai/docs/pass_through/google_ai_studio) |
|
""" |
|
|
|
google_ai_studio_api_key = request.query_params.get("key") or request.headers.get( |
|
"x-goog-api-key" |
|
) |
|
|
|
user_api_key_dict = await user_api_key_auth( |
|
request=request, api_key=f"Bearer {google_ai_studio_api_key}" |
|
) |
|
|
|
base_target_url = "https://generativelanguage.googleapis.com" |
|
encoded_endpoint = httpx.URL(endpoint).path |
|
|
|
|
|
if not encoded_endpoint.startswith("/"): |
|
encoded_endpoint = "/" + encoded_endpoint |
|
|
|
|
|
base_url = httpx.URL(base_target_url) |
|
updated_url = base_url.copy_with(path=encoded_endpoint) |
|
|
|
|
|
gemini_api_key: Optional[str] = litellm.utils.get_secret( |
|
secret_name="GEMINI_API_KEY" |
|
) |
|
if gemini_api_key is None: |
|
raise Exception( |
|
"Required 'GEMINI_API_KEY' in environment to make pass-through calls to Google AI Studio." |
|
) |
|
|
|
merged_params = dict(request.query_params) |
|
merged_params.update({"key": gemini_api_key}) |
|
|
|
|
|
is_streaming_request = False |
|
if "stream" in str(updated_url): |
|
is_streaming_request = True |
|
|
|
|
|
endpoint_func = create_pass_through_route( |
|
endpoint=endpoint, |
|
target=str(updated_url), |
|
) |
|
received_value = await endpoint_func( |
|
request, |
|
fastapi_response, |
|
user_api_key_dict, |
|
query_params=merged_params, |
|
stream=is_streaming_request, |
|
) |
|
|
|
return received_value |
|
|
|
|
|
@router.api_route( |
|
"/cohere/{endpoint:path}", |
|
methods=["GET", "POST", "PUT", "DELETE", "PATCH"], |
|
tags=["Cohere Pass-through", "pass-through"], |
|
) |
|
async def cohere_proxy_route( |
|
endpoint: str, |
|
request: Request, |
|
fastapi_response: Response, |
|
user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), |
|
): |
|
""" |
|
[Docs](https://docs.litellm.ai/docs/pass_through/cohere) |
|
""" |
|
base_target_url = "https://api.cohere.com" |
|
encoded_endpoint = httpx.URL(endpoint).path |
|
|
|
|
|
if not encoded_endpoint.startswith("/"): |
|
encoded_endpoint = "/" + encoded_endpoint |
|
|
|
|
|
base_url = httpx.URL(base_target_url) |
|
updated_url = base_url.copy_with(path=encoded_endpoint) |
|
|
|
|
|
cohere_api_key = litellm.utils.get_secret(secret_name="COHERE_API_KEY") |
|
|
|
|
|
is_streaming_request = False |
|
if "stream" in str(updated_url): |
|
is_streaming_request = True |
|
|
|
|
|
endpoint_func = create_pass_through_route( |
|
endpoint=endpoint, |
|
target=str(updated_url), |
|
custom_headers={"Authorization": "Bearer {}".format(cohere_api_key)}, |
|
) |
|
received_value = await endpoint_func( |
|
request, |
|
fastapi_response, |
|
user_api_key_dict, |
|
stream=is_streaming_request, |
|
) |
|
|
|
return received_value |
|
|
|
|
|
@router.api_route( |
|
"/anthropic/{endpoint:path}", |
|
methods=["GET", "POST", "PUT", "DELETE", "PATCH"], |
|
tags=["Anthropic Pass-through", "pass-through"], |
|
) |
|
async def anthropic_proxy_route( |
|
endpoint: str, |
|
request: Request, |
|
fastapi_response: Response, |
|
user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), |
|
): |
|
""" |
|
[Docs](https://docs.litellm.ai/docs/anthropic_completion) |
|
""" |
|
base_target_url = "https://api.anthropic.com" |
|
encoded_endpoint = httpx.URL(endpoint).path |
|
|
|
|
|
if not encoded_endpoint.startswith("/"): |
|
encoded_endpoint = "/" + encoded_endpoint |
|
|
|
|
|
base_url = httpx.URL(base_target_url) |
|
updated_url = base_url.copy_with(path=encoded_endpoint) |
|
|
|
|
|
anthropic_api_key = litellm.utils.get_secret(secret_name="ANTHROPIC_API_KEY") |
|
|
|
|
|
is_streaming_request = False |
|
|
|
if request.method == "POST": |
|
_request_body = await request.json() |
|
if _request_body.get("stream"): |
|
is_streaming_request = True |
|
|
|
|
|
endpoint_func = create_pass_through_route( |
|
endpoint=endpoint, |
|
target=str(updated_url), |
|
custom_headers={"x-api-key": "{}".format(anthropic_api_key)}, |
|
_forward_headers=True, |
|
) |
|
received_value = await endpoint_func( |
|
request, |
|
fastapi_response, |
|
user_api_key_dict, |
|
stream=is_streaming_request, |
|
) |
|
|
|
return received_value |
|
|
|
|
|
@router.api_route( |
|
"/bedrock/{endpoint:path}", |
|
methods=["GET", "POST", "PUT", "DELETE", "PATCH"], |
|
tags=["Bedrock Pass-through", "pass-through"], |
|
) |
|
async def bedrock_proxy_route( |
|
endpoint: str, |
|
request: Request, |
|
fastapi_response: Response, |
|
user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), |
|
): |
|
""" |
|
[Docs](https://docs.litellm.ai/docs/pass_through/bedrock) |
|
""" |
|
create_request_copy(request) |
|
|
|
try: |
|
from botocore.auth import SigV4Auth |
|
from botocore.awsrequest import AWSRequest |
|
from botocore.credentials import Credentials |
|
except ImportError: |
|
raise ImportError("Missing boto3 to call bedrock. Run 'pip install boto3'.") |
|
|
|
aws_region_name = litellm.utils.get_secret(secret_name="AWS_REGION_NAME") |
|
if _is_bedrock_agent_runtime_route(endpoint=endpoint): |
|
base_target_url = ( |
|
f"https://bedrock-agent-runtime.{aws_region_name}.amazonaws.com" |
|
) |
|
else: |
|
base_target_url = f"https://bedrock-runtime.{aws_region_name}.amazonaws.com" |
|
encoded_endpoint = httpx.URL(endpoint).path |
|
|
|
|
|
if not encoded_endpoint.startswith("/"): |
|
encoded_endpoint = "/" + encoded_endpoint |
|
|
|
|
|
base_url = httpx.URL(base_target_url) |
|
updated_url = base_url.copy_with(path=encoded_endpoint) |
|
|
|
|
|
from litellm.llms.bedrock.chat import BedrockConverseLLM |
|
|
|
credentials: Credentials = BedrockConverseLLM().get_credentials() |
|
sigv4 = SigV4Auth(credentials, "bedrock", aws_region_name) |
|
headers = {"Content-Type": "application/json"} |
|
|
|
try: |
|
data = await request.json() |
|
except Exception as e: |
|
raise HTTPException(status_code=400, detail={"error": e}) |
|
_request = AWSRequest( |
|
method="POST", url=str(updated_url), data=json.dumps(data), headers=headers |
|
) |
|
sigv4.add_auth(_request) |
|
prepped = _request.prepare() |
|
|
|
|
|
is_streaming_request = False |
|
if "stream" in str(updated_url): |
|
is_streaming_request = True |
|
|
|
|
|
endpoint_func = create_pass_through_route( |
|
endpoint=endpoint, |
|
target=str(prepped.url), |
|
custom_headers=prepped.headers, |
|
) |
|
received_value = await endpoint_func( |
|
request, |
|
fastapi_response, |
|
user_api_key_dict, |
|
stream=is_streaming_request, |
|
custom_body=data, |
|
query_params={}, |
|
) |
|
|
|
return received_value |
|
|
|
|
|
def _is_bedrock_agent_runtime_route(endpoint: str) -> bool: |
|
""" |
|
Return True, if the endpoint should be routed to the `bedrock-agent-runtime` endpoint. |
|
""" |
|
for _route in BEDROCK_AGENT_RUNTIME_PASS_THROUGH_ROUTES: |
|
if _route in endpoint: |
|
return True |
|
return False |
|
|
|
|
|
@router.api_route( |
|
"/assemblyai/{endpoint:path}", |
|
methods=["GET", "POST", "PUT", "DELETE", "PATCH"], |
|
tags=["AssemblyAI Pass-through", "pass-through"], |
|
) |
|
async def assemblyai_proxy_route( |
|
endpoint: str, |
|
request: Request, |
|
fastapi_response: Response, |
|
user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), |
|
): |
|
""" |
|
[Docs](https://api.assemblyai.com) |
|
""" |
|
base_target_url = "https://api.assemblyai.com" |
|
encoded_endpoint = httpx.URL(endpoint).path |
|
|
|
|
|
if not encoded_endpoint.startswith("/"): |
|
encoded_endpoint = "/" + encoded_endpoint |
|
|
|
|
|
base_url = httpx.URL(base_target_url) |
|
updated_url = base_url.copy_with(path=encoded_endpoint) |
|
|
|
|
|
assemblyai_api_key = litellm.utils.get_secret(secret_name="ASSEMBLYAI_API_KEY") |
|
|
|
|
|
is_streaming_request = False |
|
|
|
if request.method == "POST": |
|
_request_body = await request.json() |
|
if _request_body.get("stream"): |
|
is_streaming_request = True |
|
|
|
|
|
endpoint_func = create_pass_through_route( |
|
endpoint=endpoint, |
|
target=str(updated_url), |
|
custom_headers={"Authorization": "{}".format(assemblyai_api_key)}, |
|
) |
|
received_value = await endpoint_func( |
|
request=request, |
|
fastapi_response=fastapi_response, |
|
user_api_key_dict=user_api_key_dict, |
|
stream=is_streaming_request, |
|
) |
|
|
|
return received_value |
|
|
|
|
|
@router.api_route( |
|
"/azure/{endpoint:path}", |
|
methods=["GET", "POST", "PUT", "DELETE", "PATCH"], |
|
tags=["Azure Pass-through", "pass-through"], |
|
) |
|
async def azure_proxy_route( |
|
endpoint: str, |
|
request: Request, |
|
fastapi_response: Response, |
|
user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), |
|
): |
|
""" |
|
Call any azure endpoint using the proxy. |
|
|
|
Just use `{PROXY_BASE_URL}/azure/{endpoint:path}` |
|
""" |
|
base_target_url = get_secret_str(secret_name="AZURE_API_BASE") |
|
if base_target_url is None: |
|
raise Exception( |
|
"Required 'AZURE_API_BASE' in environment to make pass-through calls to Azure." |
|
) |
|
|
|
azure_api_key = get_secret_str(secret_name="AZURE_API_KEY") |
|
if azure_api_key is None: |
|
raise Exception( |
|
"Required 'AZURE_API_KEY' in environment to make pass-through calls to Azure." |
|
) |
|
|
|
return await _base_openai_pass_through_handler( |
|
endpoint=endpoint, |
|
request=request, |
|
fastapi_response=fastapi_response, |
|
user_api_key_dict=user_api_key_dict, |
|
base_target_url=base_target_url, |
|
api_key=azure_api_key, |
|
) |
|
|
|
|
|
@router.api_route( |
|
"/openai/{endpoint:path}", |
|
methods=["GET", "POST", "PUT", "DELETE", "PATCH"], |
|
tags=["OpenAI Pass-through", "pass-through"], |
|
) |
|
async def openai_proxy_route( |
|
endpoint: str, |
|
request: Request, |
|
fastapi_response: Response, |
|
user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), |
|
): |
|
""" |
|
Simple pass-through for OpenAI. Use this if you want to directly send a request to OpenAI. |
|
|
|
|
|
""" |
|
base_target_url = "https://api.openai.com" |
|
|
|
openai_api_key = get_secret_str(secret_name="OPENAI_API_KEY") |
|
if openai_api_key is None: |
|
raise Exception( |
|
"Required 'OPENAI_API_KEY' in environment to make pass-through calls to OpenAI." |
|
) |
|
|
|
return await _base_openai_pass_through_handler( |
|
endpoint=endpoint, |
|
request=request, |
|
fastapi_response=fastapi_response, |
|
user_api_key_dict=user_api_key_dict, |
|
base_target_url=base_target_url, |
|
api_key=openai_api_key, |
|
) |
|
|
|
|
|
async def _base_openai_pass_through_handler( |
|
endpoint: str, |
|
request: Request, |
|
fastapi_response: Response, |
|
user_api_key_dict: UserAPIKeyAuth, |
|
base_target_url: str, |
|
api_key: str, |
|
): |
|
encoded_endpoint = httpx.URL(endpoint).path |
|
|
|
|
|
if not encoded_endpoint.startswith("/"): |
|
encoded_endpoint = "/" + encoded_endpoint |
|
|
|
|
|
base_url = httpx.URL(base_target_url) |
|
updated_url = base_url.copy_with(path=encoded_endpoint) |
|
|
|
|
|
is_streaming_request = False |
|
if "stream" in str(updated_url): |
|
is_streaming_request = True |
|
|
|
|
|
endpoint_func = create_pass_through_route( |
|
endpoint=endpoint, |
|
target=str(updated_url), |
|
custom_headers={ |
|
"authorization": "Bearer {}".format(api_key), |
|
"api-key": "{}".format(api_key), |
|
}, |
|
) |
|
received_value = await endpoint_func( |
|
request, |
|
fastapi_response, |
|
user_api_key_dict, |
|
stream=is_streaming_request, |
|
query_params=dict(request.query_params), |
|
) |
|
|
|
return received_value |
|
|