|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
import sys |
|
|
|
sys.path.insert( |
|
0, os.path.abspath("../..") |
|
) |
|
import json |
|
import sys |
|
from typing import Any, List, Literal, Optional, Union |
|
|
|
from fastapi import HTTPException |
|
|
|
import litellm |
|
from litellm._logging import verbose_proxy_logger |
|
from litellm.integrations.custom_guardrail import ( |
|
CustomGuardrail, |
|
log_guardrail_information, |
|
) |
|
from litellm.litellm_core_utils.prompt_templates.common_utils import ( |
|
convert_content_list_to_str, |
|
) |
|
from litellm.llms.bedrock.base_aws_llm import BaseAWSLLM |
|
from litellm.llms.custom_httpx.http_handler import ( |
|
get_async_httpx_client, |
|
httpxSpecialProvider, |
|
) |
|
from litellm.proxy._types import UserAPIKeyAuth |
|
from litellm.secret_managers.main import get_secret |
|
from litellm.types.guardrails import ( |
|
BedrockContentItem, |
|
BedrockRequest, |
|
BedrockTextContent, |
|
GuardrailEventHooks, |
|
) |
|
from litellm.types.llms.openai import AllMessageValues |
|
from litellm.types.utils import ModelResponse |
|
|
|
GUARDRAIL_NAME = "bedrock" |
|
|
|
|
|
class BedrockGuardrail(CustomGuardrail, BaseAWSLLM): |
|
def __init__( |
|
self, |
|
guardrailIdentifier: Optional[str] = None, |
|
guardrailVersion: Optional[str] = None, |
|
**kwargs, |
|
): |
|
self.async_handler = get_async_httpx_client( |
|
llm_provider=httpxSpecialProvider.GuardrailCallback |
|
) |
|
self.guardrailIdentifier = guardrailIdentifier |
|
self.guardrailVersion = guardrailVersion |
|
|
|
|
|
self.optional_params = kwargs |
|
|
|
super().__init__(**kwargs) |
|
BaseAWSLLM.__init__(self) |
|
|
|
def convert_to_bedrock_format( |
|
self, |
|
messages: Optional[List[AllMessageValues]] = None, |
|
response: Optional[Union[Any, ModelResponse]] = None, |
|
) -> BedrockRequest: |
|
bedrock_request: BedrockRequest = BedrockRequest(source="INPUT") |
|
bedrock_request_content: List[BedrockContentItem] = [] |
|
|
|
if messages: |
|
for message in messages: |
|
bedrock_content_item = BedrockContentItem( |
|
text=BedrockTextContent( |
|
text=convert_content_list_to_str(message=message) |
|
) |
|
) |
|
bedrock_request_content.append(bedrock_content_item) |
|
|
|
bedrock_request["content"] = bedrock_request_content |
|
if response: |
|
bedrock_request["source"] = "OUTPUT" |
|
if isinstance(response, litellm.ModelResponse): |
|
for choice in response.choices: |
|
if isinstance(choice, litellm.Choices): |
|
if choice.message.content and isinstance( |
|
choice.message.content, str |
|
): |
|
bedrock_content_item = BedrockContentItem( |
|
text=BedrockTextContent(text=choice.message.content) |
|
) |
|
bedrock_request_content.append(bedrock_content_item) |
|
bedrock_request["content"] = bedrock_request_content |
|
return bedrock_request |
|
|
|
|
|
def _load_credentials( |
|
self, |
|
): |
|
try: |
|
from botocore.credentials import Credentials |
|
except ImportError: |
|
raise ImportError("Missing boto3 to call bedrock. Run 'pip install boto3'.") |
|
|
|
|
|
aws_secret_access_key = self.optional_params.pop("aws_secret_access_key", None) |
|
aws_access_key_id = self.optional_params.pop("aws_access_key_id", None) |
|
aws_session_token = self.optional_params.pop("aws_session_token", None) |
|
aws_region_name = self.optional_params.pop("aws_region_name", None) |
|
aws_role_name = self.optional_params.pop("aws_role_name", None) |
|
aws_session_name = self.optional_params.pop("aws_session_name", None) |
|
aws_profile_name = self.optional_params.pop("aws_profile_name", None) |
|
self.optional_params.pop( |
|
"aws_bedrock_runtime_endpoint", None |
|
) |
|
aws_web_identity_token = self.optional_params.pop( |
|
"aws_web_identity_token", None |
|
) |
|
aws_sts_endpoint = self.optional_params.pop("aws_sts_endpoint", None) |
|
|
|
|
|
if aws_region_name is None: |
|
|
|
litellm_aws_region_name = get_secret("AWS_REGION_NAME", None) |
|
|
|
if litellm_aws_region_name is not None and isinstance( |
|
litellm_aws_region_name, str |
|
): |
|
aws_region_name = litellm_aws_region_name |
|
|
|
standard_aws_region_name = get_secret("AWS_REGION", None) |
|
if standard_aws_region_name is not None and isinstance( |
|
standard_aws_region_name, str |
|
): |
|
aws_region_name = standard_aws_region_name |
|
|
|
if aws_region_name is None: |
|
aws_region_name = "us-west-2" |
|
|
|
credentials: Credentials = self.get_credentials( |
|
aws_access_key_id=aws_access_key_id, |
|
aws_secret_access_key=aws_secret_access_key, |
|
aws_session_token=aws_session_token, |
|
aws_region_name=aws_region_name, |
|
aws_session_name=aws_session_name, |
|
aws_profile_name=aws_profile_name, |
|
aws_role_name=aws_role_name, |
|
aws_web_identity_token=aws_web_identity_token, |
|
aws_sts_endpoint=aws_sts_endpoint, |
|
) |
|
return credentials, aws_region_name |
|
|
|
def _prepare_request( |
|
self, |
|
credentials, |
|
data: dict, |
|
optional_params: dict, |
|
aws_region_name: str, |
|
extra_headers: Optional[dict] = None, |
|
): |
|
try: |
|
from botocore.auth import SigV4Auth |
|
from botocore.awsrequest import AWSRequest |
|
except ImportError: |
|
raise ImportError("Missing boto3 to call bedrock. Run 'pip install boto3'.") |
|
|
|
sigv4 = SigV4Auth(credentials, "bedrock", aws_region_name) |
|
api_base = f"https://bedrock-runtime.{aws_region_name}.amazonaws.com/guardrail/{self.guardrailIdentifier}/version/{self.guardrailVersion}/apply" |
|
|
|
encoded_data = json.dumps(data).encode("utf-8") |
|
headers = {"Content-Type": "application/json"} |
|
if extra_headers is not None: |
|
headers = {"Content-Type": "application/json", **extra_headers} |
|
|
|
request = AWSRequest( |
|
method="POST", url=api_base, data=encoded_data, headers=headers |
|
) |
|
sigv4.add_auth(request) |
|
if ( |
|
extra_headers is not None and "Authorization" in extra_headers |
|
): |
|
request.headers["Authorization"] = extra_headers["Authorization"] |
|
|
|
prepped_request = request.prepare() |
|
|
|
return prepped_request |
|
|
|
async def make_bedrock_api_request( |
|
self, kwargs: dict, response: Optional[Union[Any, litellm.ModelResponse]] = None |
|
): |
|
|
|
credentials, aws_region_name = self._load_credentials() |
|
bedrock_request_data: dict = dict( |
|
self.convert_to_bedrock_format( |
|
messages=kwargs.get("messages"), response=response |
|
) |
|
) |
|
bedrock_request_data.update( |
|
self.get_guardrail_dynamic_request_body_params(request_data=kwargs) |
|
) |
|
prepared_request = self._prepare_request( |
|
credentials=credentials, |
|
data=bedrock_request_data, |
|
optional_params=self.optional_params, |
|
aws_region_name=aws_region_name, |
|
) |
|
verbose_proxy_logger.debug( |
|
"Bedrock AI request body: %s, url %s, headers: %s", |
|
bedrock_request_data, |
|
prepared_request.url, |
|
prepared_request.headers, |
|
) |
|
|
|
response = await self.async_handler.post( |
|
url=prepared_request.url, |
|
data=prepared_request.body, |
|
headers=prepared_request.headers, |
|
) |
|
verbose_proxy_logger.debug("Bedrock AI response: %s", response.text) |
|
if response.status_code == 200: |
|
|
|
_json_response = response.json() |
|
if _json_response.get("action") == "GUARDRAIL_INTERVENED": |
|
raise HTTPException( |
|
status_code=400, |
|
detail={ |
|
"error": "Violated guardrail policy", |
|
"bedrock_guardrail_response": _json_response, |
|
}, |
|
) |
|
else: |
|
verbose_proxy_logger.error( |
|
"Bedrock AI: error in response. Status code: %s, response: %s", |
|
response.status_code, |
|
response.text, |
|
) |
|
|
|
@log_guardrail_information |
|
async def async_moderation_hook( |
|
self, |
|
data: dict, |
|
user_api_key_dict: UserAPIKeyAuth, |
|
call_type: Literal[ |
|
"completion", |
|
"embeddings", |
|
"image_generation", |
|
"moderation", |
|
"audio_transcription", |
|
], |
|
): |
|
from litellm.proxy.common_utils.callback_utils import ( |
|
add_guardrail_to_applied_guardrails_header, |
|
) |
|
|
|
event_type: GuardrailEventHooks = GuardrailEventHooks.during_call |
|
if self.should_run_guardrail(data=data, event_type=event_type) is not True: |
|
return |
|
|
|
new_messages: Optional[List[dict]] = data.get("messages") |
|
if new_messages is not None: |
|
await self.make_bedrock_api_request(kwargs=data) |
|
add_guardrail_to_applied_guardrails_header( |
|
request_data=data, guardrail_name=self.guardrail_name |
|
) |
|
else: |
|
verbose_proxy_logger.warning( |
|
"Bedrock AI: not running guardrail. No messages in data" |
|
) |
|
pass |
|
|
|
@log_guardrail_information |
|
async def async_post_call_success_hook( |
|
self, |
|
data: dict, |
|
user_api_key_dict: UserAPIKeyAuth, |
|
response, |
|
): |
|
from litellm.proxy.common_utils.callback_utils import ( |
|
add_guardrail_to_applied_guardrails_header, |
|
) |
|
from litellm.types.guardrails import GuardrailEventHooks |
|
|
|
if ( |
|
self.should_run_guardrail( |
|
data=data, event_type=GuardrailEventHooks.post_call |
|
) |
|
is not True |
|
): |
|
return |
|
|
|
new_messages: Optional[List[dict]] = data.get("messages") |
|
if new_messages is not None: |
|
await self.make_bedrock_api_request(kwargs=data, response=response) |
|
add_guardrail_to_applied_guardrails_header( |
|
request_data=data, guardrail_name=self.guardrail_name |
|
) |
|
else: |
|
verbose_proxy_logger.warning( |
|
"Bedrock AI: not running guardrail. No messages in data" |
|
) |
|
|