Raju2024's picture
Upload 1072 files
e3278e4 verified
raw
history blame
11.4 kB
# +-------------------------------------------------------------+
#
# Use Bedrock Guardrails for your LLM calls
#
# +-------------------------------------------------------------+
# Thank you users! We ❤️ you! - Krrish & Ishaan
import os
import sys
sys.path.insert(
0, os.path.abspath("../..")
) # Adds the parent directory to the system path
import json
import sys
from typing import Any, List, Literal, Optional, Union
from fastapi import HTTPException
import litellm
from litellm._logging import verbose_proxy_logger
from litellm.integrations.custom_guardrail import (
CustomGuardrail,
log_guardrail_information,
)
from litellm.litellm_core_utils.prompt_templates.common_utils import (
convert_content_list_to_str,
)
from litellm.llms.bedrock.base_aws_llm import BaseAWSLLM
from litellm.llms.custom_httpx.http_handler import (
get_async_httpx_client,
httpxSpecialProvider,
)
from litellm.proxy._types import UserAPIKeyAuth
from litellm.secret_managers.main import get_secret
from litellm.types.guardrails import (
BedrockContentItem,
BedrockRequest,
BedrockTextContent,
GuardrailEventHooks,
)
from litellm.types.llms.openai import AllMessageValues
from litellm.types.utils import ModelResponse
GUARDRAIL_NAME = "bedrock"
class BedrockGuardrail(CustomGuardrail, BaseAWSLLM):
def __init__(
self,
guardrailIdentifier: Optional[str] = None,
guardrailVersion: Optional[str] = None,
**kwargs,
):
self.async_handler = get_async_httpx_client(
llm_provider=httpxSpecialProvider.GuardrailCallback
)
self.guardrailIdentifier = guardrailIdentifier
self.guardrailVersion = guardrailVersion
# store kwargs as optional_params
self.optional_params = kwargs
super().__init__(**kwargs)
BaseAWSLLM.__init__(self)
def convert_to_bedrock_format(
self,
messages: Optional[List[AllMessageValues]] = None,
response: Optional[Union[Any, ModelResponse]] = None,
) -> BedrockRequest:
bedrock_request: BedrockRequest = BedrockRequest(source="INPUT")
bedrock_request_content: List[BedrockContentItem] = []
if messages:
for message in messages:
bedrock_content_item = BedrockContentItem(
text=BedrockTextContent(
text=convert_content_list_to_str(message=message)
)
)
bedrock_request_content.append(bedrock_content_item)
bedrock_request["content"] = bedrock_request_content
if response:
bedrock_request["source"] = "OUTPUT"
if isinstance(response, litellm.ModelResponse):
for choice in response.choices:
if isinstance(choice, litellm.Choices):
if choice.message.content and isinstance(
choice.message.content, str
):
bedrock_content_item = BedrockContentItem(
text=BedrockTextContent(text=choice.message.content)
)
bedrock_request_content.append(bedrock_content_item)
bedrock_request["content"] = bedrock_request_content
return bedrock_request
#### CALL HOOKS - proxy only ####
def _load_credentials(
self,
):
try:
from botocore.credentials import Credentials
except ImportError:
raise ImportError("Missing boto3 to call bedrock. Run 'pip install boto3'.")
## CREDENTIALS ##
# pop aws_secret_access_key, aws_access_key_id, aws_session_token, aws_region_name from kwargs, since completion calls fail with them
aws_secret_access_key = self.optional_params.pop("aws_secret_access_key", None)
aws_access_key_id = self.optional_params.pop("aws_access_key_id", None)
aws_session_token = self.optional_params.pop("aws_session_token", None)
aws_region_name = self.optional_params.pop("aws_region_name", None)
aws_role_name = self.optional_params.pop("aws_role_name", None)
aws_session_name = self.optional_params.pop("aws_session_name", None)
aws_profile_name = self.optional_params.pop("aws_profile_name", None)
self.optional_params.pop(
"aws_bedrock_runtime_endpoint", None
) # https://bedrock-runtime.{region_name}.amazonaws.com
aws_web_identity_token = self.optional_params.pop(
"aws_web_identity_token", None
)
aws_sts_endpoint = self.optional_params.pop("aws_sts_endpoint", None)
### SET REGION NAME ###
if aws_region_name is None:
# check env #
litellm_aws_region_name = get_secret("AWS_REGION_NAME", None)
if litellm_aws_region_name is not None and isinstance(
litellm_aws_region_name, str
):
aws_region_name = litellm_aws_region_name
standard_aws_region_name = get_secret("AWS_REGION", None)
if standard_aws_region_name is not None and isinstance(
standard_aws_region_name, str
):
aws_region_name = standard_aws_region_name
if aws_region_name is None:
aws_region_name = "us-west-2"
credentials: Credentials = self.get_credentials(
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
aws_session_token=aws_session_token,
aws_region_name=aws_region_name,
aws_session_name=aws_session_name,
aws_profile_name=aws_profile_name,
aws_role_name=aws_role_name,
aws_web_identity_token=aws_web_identity_token,
aws_sts_endpoint=aws_sts_endpoint,
)
return credentials, aws_region_name
def _prepare_request(
self,
credentials,
data: dict,
optional_params: dict,
aws_region_name: str,
extra_headers: Optional[dict] = None,
):
try:
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
except ImportError:
raise ImportError("Missing boto3 to call bedrock. Run 'pip install boto3'.")
sigv4 = SigV4Auth(credentials, "bedrock", aws_region_name)
api_base = f"https://bedrock-runtime.{aws_region_name}.amazonaws.com/guardrail/{self.guardrailIdentifier}/version/{self.guardrailVersion}/apply"
encoded_data = json.dumps(data).encode("utf-8")
headers = {"Content-Type": "application/json"}
if extra_headers is not None:
headers = {"Content-Type": "application/json", **extra_headers}
request = AWSRequest(
method="POST", url=api_base, data=encoded_data, headers=headers
)
sigv4.add_auth(request)
if (
extra_headers is not None and "Authorization" in extra_headers
): # prevent sigv4 from overwriting the auth header
request.headers["Authorization"] = extra_headers["Authorization"]
prepped_request = request.prepare()
return prepped_request
async def make_bedrock_api_request(
self, kwargs: dict, response: Optional[Union[Any, litellm.ModelResponse]] = None
):
credentials, aws_region_name = self._load_credentials()
bedrock_request_data: dict = dict(
self.convert_to_bedrock_format(
messages=kwargs.get("messages"), response=response
)
)
bedrock_request_data.update(
self.get_guardrail_dynamic_request_body_params(request_data=kwargs)
)
prepared_request = self._prepare_request(
credentials=credentials,
data=bedrock_request_data,
optional_params=self.optional_params,
aws_region_name=aws_region_name,
)
verbose_proxy_logger.debug(
"Bedrock AI request body: %s, url %s, headers: %s",
bedrock_request_data,
prepared_request.url,
prepared_request.headers,
)
response = await self.async_handler.post(
url=prepared_request.url,
data=prepared_request.body, # type: ignore
headers=prepared_request.headers, # type: ignore
)
verbose_proxy_logger.debug("Bedrock AI response: %s", response.text)
if response.status_code == 200:
# check if the response was flagged
_json_response = response.json()
if _json_response.get("action") == "GUARDRAIL_INTERVENED":
raise HTTPException(
status_code=400,
detail={
"error": "Violated guardrail policy",
"bedrock_guardrail_response": _json_response,
},
)
else:
verbose_proxy_logger.error(
"Bedrock AI: error in response. Status code: %s, response: %s",
response.status_code,
response.text,
)
@log_guardrail_information
async def async_moderation_hook( ### 👈 KEY CHANGE ###
self,
data: dict,
user_api_key_dict: UserAPIKeyAuth,
call_type: Literal[
"completion",
"embeddings",
"image_generation",
"moderation",
"audio_transcription",
],
):
from litellm.proxy.common_utils.callback_utils import (
add_guardrail_to_applied_guardrails_header,
)
event_type: GuardrailEventHooks = GuardrailEventHooks.during_call
if self.should_run_guardrail(data=data, event_type=event_type) is not True:
return
new_messages: Optional[List[dict]] = data.get("messages")
if new_messages is not None:
await self.make_bedrock_api_request(kwargs=data)
add_guardrail_to_applied_guardrails_header(
request_data=data, guardrail_name=self.guardrail_name
)
else:
verbose_proxy_logger.warning(
"Bedrock AI: not running guardrail. No messages in data"
)
pass
@log_guardrail_information
async def async_post_call_success_hook(
self,
data: dict,
user_api_key_dict: UserAPIKeyAuth,
response,
):
from litellm.proxy.common_utils.callback_utils import (
add_guardrail_to_applied_guardrails_header,
)
from litellm.types.guardrails import GuardrailEventHooks
if (
self.should_run_guardrail(
data=data, event_type=GuardrailEventHooks.post_call
)
is not True
):
return
new_messages: Optional[List[dict]] = data.get("messages")
if new_messages is not None:
await self.make_bedrock_api_request(kwargs=data, response=response)
add_guardrail_to_applied_guardrails_header(
request_data=data, guardrail_name=self.guardrail_name
)
else:
verbose_proxy_logger.warning(
"Bedrock AI: not running guardrail. No messages in data"
)