Spaces:
Paused
Paused
| """ | |
| This is a file for the AWS Secret Manager Integration | |
| Handles Async Operations for: | |
| - Read Secret | |
| - Write Secret | |
| - Delete Secret | |
| Relevant issue: https://github.com/BerriAI/litellm/issues/1883 | |
| Requires: | |
| * `os.environ["AWS_REGION_NAME"], | |
| * `pip install boto3>=1.28.57` | |
| """ | |
| import json | |
| import os | |
| from typing import Any, Optional, Union | |
| import httpx | |
| import litellm | |
| from litellm._logging import verbose_logger | |
| from litellm.llms.bedrock.base_aws_llm import BaseAWSLLM | |
| from litellm.llms.custom_httpx.http_handler import ( | |
| _get_httpx_client, | |
| get_async_httpx_client, | |
| ) | |
| from litellm.proxy._types import KeyManagementSystem | |
| from litellm.types.llms.custom_http import httpxSpecialProvider | |
| from .base_secret_manager import BaseSecretManager | |
| class AWSSecretsManagerV2(BaseAWSLLM, BaseSecretManager): | |
| def __init__(self, **kwargs): | |
| BaseSecretManager.__init__(self, **kwargs) | |
| BaseAWSLLM.__init__(self, **kwargs) | |
| def validate_environment(cls): | |
| if "AWS_REGION_NAME" not in os.environ: | |
| raise ValueError("Missing required environment variable - AWS_REGION_NAME") | |
| def load_aws_secret_manager(cls, use_aws_secret_manager: Optional[bool]): | |
| """ | |
| Initialize AWSSecretsManagerV2 and sets litellm.secret_manager_client = AWSSecretsManagerV2() and litellm._key_management_system = KeyManagementSystem.AWS_SECRET_MANAGER | |
| """ | |
| if use_aws_secret_manager is None or use_aws_secret_manager is False: | |
| return | |
| try: | |
| cls.validate_environment() | |
| litellm.secret_manager_client = cls() | |
| litellm._key_management_system = KeyManagementSystem.AWS_SECRET_MANAGER | |
| except Exception as e: | |
| raise e | |
| async def async_read_secret( | |
| self, | |
| secret_name: str, | |
| optional_params: Optional[dict] = None, | |
| timeout: Optional[Union[float, httpx.Timeout]] = None, | |
| primary_secret_name: Optional[str] = None, | |
| ) -> Optional[str]: | |
| """ | |
| Async function to read a secret from AWS Secrets Manager | |
| Returns: | |
| str: Secret value | |
| Raises: | |
| ValueError: If the secret is not found or an HTTP error occurs | |
| """ | |
| if primary_secret_name: | |
| return await self.async_read_secret_from_primary_secret( | |
| secret_name=secret_name, primary_secret_name=primary_secret_name | |
| ) | |
| endpoint_url, headers, body = self._prepare_request( | |
| action="GetSecretValue", | |
| secret_name=secret_name, | |
| optional_params=optional_params, | |
| ) | |
| async_client = get_async_httpx_client( | |
| llm_provider=httpxSpecialProvider.SecretManager, | |
| params={"timeout": timeout}, | |
| ) | |
| try: | |
| response = await async_client.post( | |
| url=endpoint_url, headers=headers, data=body.decode("utf-8") | |
| ) | |
| response.raise_for_status() | |
| return response.json()["SecretString"] | |
| except httpx.TimeoutException: | |
| raise ValueError("Timeout error occurred") | |
| except Exception as e: | |
| verbose_logger.exception( | |
| "Error reading secret='%s' from AWS Secrets Manager: %s", | |
| secret_name, | |
| str(e), | |
| ) | |
| return None | |
| def sync_read_secret( | |
| self, | |
| secret_name: str, | |
| optional_params: Optional[dict] = None, | |
| timeout: Optional[Union[float, httpx.Timeout]] = None, | |
| primary_secret_name: Optional[str] = None, | |
| ) -> Optional[str]: | |
| """ | |
| Sync function to read a secret from AWS Secrets Manager | |
| Done for backwards compatibility with existing codebase, since get_secret is a sync function | |
| """ | |
| # self._prepare_request uses these env vars, we cannot read them from AWS Secrets Manager. If we do we'd get stuck in an infinite loop | |
| if secret_name in [ | |
| "AWS_ACCESS_KEY_ID", | |
| "AWS_SECRET_ACCESS_KEY", | |
| "AWS_REGION_NAME", | |
| "AWS_REGION", | |
| "AWS_BEDROCK_RUNTIME_ENDPOINT", | |
| ]: | |
| return os.getenv(secret_name) | |
| if primary_secret_name: | |
| return self.sync_read_secret_from_primary_secret( | |
| secret_name=secret_name, primary_secret_name=primary_secret_name | |
| ) | |
| endpoint_url, headers, body = self._prepare_request( | |
| action="GetSecretValue", | |
| secret_name=secret_name, | |
| optional_params=optional_params, | |
| ) | |
| sync_client = _get_httpx_client( | |
| params={"timeout": timeout}, | |
| ) | |
| try: | |
| response = sync_client.post( | |
| url=endpoint_url, headers=headers, data=body.decode("utf-8") | |
| ) | |
| return response.json()["SecretString"] | |
| except httpx.TimeoutException: | |
| raise ValueError("Timeout error occurred") | |
| except httpx.HTTPStatusError as e: | |
| verbose_logger.exception( | |
| "Error reading secret='%s' from AWS Secrets Manager: %s, %s", | |
| secret_name, | |
| str(e.response.text), | |
| str(e.response.status_code), | |
| ) | |
| except Exception as e: | |
| verbose_logger.exception( | |
| "Error reading secret='%s' from AWS Secrets Manager: %s", | |
| secret_name, | |
| str(e), | |
| ) | |
| return None | |
| def _parse_primary_secret(self, primary_secret_json_str: Optional[str]) -> dict: | |
| """ | |
| Parse the primary secret JSON string into a dictionary | |
| Args: | |
| primary_secret_json_str: JSON string containing key-value pairs | |
| Returns: | |
| Dictionary of key-value pairs from the primary secret | |
| """ | |
| return json.loads(primary_secret_json_str or "{}") | |
| def sync_read_secret_from_primary_secret( | |
| self, secret_name: str, primary_secret_name: str | |
| ) -> Optional[str]: | |
| """ | |
| Read a secret from the primary secret | |
| """ | |
| primary_secret_json_str = self.sync_read_secret(secret_name=primary_secret_name) | |
| primary_secret_kv_pairs = self._parse_primary_secret(primary_secret_json_str) | |
| return primary_secret_kv_pairs.get(secret_name) | |
| async def async_read_secret_from_primary_secret( | |
| self, secret_name: str, primary_secret_name: str | |
| ) -> Optional[str]: | |
| """ | |
| Read a secret from the primary secret | |
| """ | |
| primary_secret_json_str = await self.async_read_secret( | |
| secret_name=primary_secret_name | |
| ) | |
| primary_secret_kv_pairs = self._parse_primary_secret(primary_secret_json_str) | |
| return primary_secret_kv_pairs.get(secret_name) | |
| async def async_write_secret( | |
| self, | |
| secret_name: str, | |
| secret_value: str, | |
| description: Optional[str] = None, | |
| optional_params: Optional[dict] = None, | |
| timeout: Optional[Union[float, httpx.Timeout]] = None, | |
| ) -> dict: | |
| """ | |
| Async function to write a secret to AWS Secrets Manager | |
| Args: | |
| secret_name: Name of the secret | |
| secret_value: Value to store (can be a JSON string) | |
| description: Optional description for the secret | |
| optional_params: Additional AWS parameters | |
| timeout: Request timeout | |
| """ | |
| import uuid | |
| # Prepare the request data | |
| data = {"Name": secret_name, "SecretString": secret_value} | |
| if description: | |
| data["Description"] = description | |
| data["ClientRequestToken"] = str(uuid.uuid4()) | |
| endpoint_url, headers, body = self._prepare_request( | |
| action="CreateSecret", | |
| secret_name=secret_name, | |
| secret_value=secret_value, | |
| optional_params=optional_params, | |
| request_data=data, # Pass the complete request data | |
| ) | |
| async_client = get_async_httpx_client( | |
| llm_provider=httpxSpecialProvider.SecretManager, | |
| params={"timeout": timeout}, | |
| ) | |
| try: | |
| response = await async_client.post( | |
| url=endpoint_url, headers=headers, data=body.decode("utf-8") | |
| ) | |
| response.raise_for_status() | |
| return response.json() | |
| except httpx.HTTPStatusError as err: | |
| raise ValueError(f"HTTP error occurred: {err.response.text}") | |
| except httpx.TimeoutException: | |
| raise ValueError("Timeout error occurred") | |
| async def async_delete_secret( | |
| self, | |
| secret_name: str, | |
| recovery_window_in_days: Optional[int] = 7, | |
| optional_params: Optional[dict] = None, | |
| timeout: Optional[Union[float, httpx.Timeout]] = None, | |
| ) -> dict: | |
| """ | |
| Async function to delete a secret from AWS Secrets Manager | |
| Args: | |
| secret_name: Name of the secret to delete | |
| recovery_window_in_days: Number of days before permanent deletion (default: 7) | |
| optional_params: Additional AWS parameters | |
| timeout: Request timeout | |
| Returns: | |
| dict: Response from AWS Secrets Manager containing deletion details | |
| """ | |
| # Prepare the request data | |
| data = { | |
| "SecretId": secret_name, | |
| "RecoveryWindowInDays": recovery_window_in_days, | |
| } | |
| endpoint_url, headers, body = self._prepare_request( | |
| action="DeleteSecret", | |
| secret_name=secret_name, | |
| optional_params=optional_params, | |
| request_data=data, | |
| ) | |
| async_client = get_async_httpx_client( | |
| llm_provider=httpxSpecialProvider.SecretManager, | |
| params={"timeout": timeout}, | |
| ) | |
| try: | |
| response = await async_client.post( | |
| url=endpoint_url, headers=headers, data=body.decode("utf-8") | |
| ) | |
| response.raise_for_status() | |
| return response.json() | |
| except httpx.HTTPStatusError as err: | |
| raise ValueError(f"HTTP error occurred: {err.response.text}") | |
| except httpx.TimeoutException: | |
| raise ValueError("Timeout error occurred") | |
| def _prepare_request( | |
| self, | |
| action: str, # "GetSecretValue" or "PutSecretValue" | |
| secret_name: str, | |
| secret_value: Optional[str] = None, | |
| optional_params: Optional[dict] = None, | |
| request_data: Optional[dict] = None, | |
| ) -> tuple[str, Any, bytes]: | |
| """Prepare the AWS Secrets Manager request""" | |
| try: | |
| from botocore.auth import SigV4Auth | |
| from botocore.awsrequest import AWSRequest | |
| except ImportError: | |
| raise ImportError("Missing boto3 to call bedrock. Run 'pip install boto3'.") | |
| optional_params = optional_params or {} | |
| boto3_credentials_info = self._get_boto_credentials_from_optional_params( | |
| optional_params | |
| ) | |
| # Get endpoint | |
| _, endpoint_url = self.get_runtime_endpoint( | |
| api_base=None, | |
| aws_bedrock_runtime_endpoint=boto3_credentials_info.aws_bedrock_runtime_endpoint, | |
| aws_region_name=boto3_credentials_info.aws_region_name, | |
| ) | |
| endpoint_url = endpoint_url.replace("bedrock-runtime", "secretsmanager") | |
| # Use provided request_data if available, otherwise build default data | |
| if request_data: | |
| data = request_data | |
| else: | |
| data = {"SecretId": secret_name} | |
| if secret_value and action == "PutSecretValue": | |
| data["SecretString"] = secret_value | |
| body = json.dumps(data).encode("utf-8") | |
| headers = { | |
| "Content-Type": "application/x-amz-json-1.1", | |
| "X-Amz-Target": f"secretsmanager.{action}", | |
| } | |
| # Sign request | |
| request = AWSRequest( | |
| method="POST", url=endpoint_url, data=body, headers=headers | |
| ) | |
| SigV4Auth( | |
| boto3_credentials_info.credentials, | |
| "secretsmanager", | |
| boto3_credentials_info.aws_region_name, | |
| ).add_auth(request) | |
| prepped = request.prepare() | |
| return endpoint_url, prepped.headers, body | |
| # if __name__ == "__main__": | |
| # print("loading aws secret manager v2") | |
| # aws_secret_manager_v2 = AWSSecretsManagerV2() | |
| # print("writing secret to aws secret manager v2") | |
| # asyncio.run(aws_secret_manager_v2.async_write_secret(secret_name="test_secret_3", secret_value="test_value_2")) | |
| # print("reading secret from aws secret manager v2") | |