File size: 6,222 Bytes
e3278e4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional, Union
import httpx
from litellm import verbose_logger
class BaseSecretManager(ABC):
"""
Abstract base class for secret management implementations.
"""
@abstractmethod
async def async_read_secret(
self,
secret_name: str,
optional_params: Optional[dict] = None,
timeout: Optional[Union[float, httpx.Timeout]] = None,
) -> Optional[str]:
"""
Asynchronously read a secret from the secret manager.
Args:
secret_name (str): Name/path of the secret to read
optional_params (Optional[dict]): Additional parameters specific to the secret manager
timeout (Optional[Union[float, httpx.Timeout]]): Request timeout
Returns:
Optional[str]: The secret value if found, None otherwise
"""
pass
@abstractmethod
def sync_read_secret(
self,
secret_name: str,
optional_params: Optional[dict] = None,
timeout: Optional[Union[float, httpx.Timeout]] = None,
) -> Optional[str]:
"""
Synchronously read a secret from the secret manager.
Args:
secret_name (str): Name/path of the secret to read
optional_params (Optional[dict]): Additional parameters specific to the secret manager
timeout (Optional[Union[float, httpx.Timeout]]): Request timeout
Returns:
Optional[str]: The secret value if found, None otherwise
"""
pass
@abstractmethod
async def async_write_secret(
self,
secret_name: str,
secret_value: str,
description: Optional[str] = None,
optional_params: Optional[dict] = None,
timeout: Optional[Union[float, httpx.Timeout]] = None,
) -> Dict[str, Any]:
"""
Asynchronously write a secret to the secret manager.
Args:
secret_name (str): Name/path of the secret to write
secret_value (str): Value to store
description (Optional[str]): Description of the secret. Some secret managers allow storing a description with the secret.
optional_params (Optional[dict]): Additional parameters specific to the secret manager
timeout (Optional[Union[float, httpx.Timeout]]): Request timeout
Returns:
Dict[str, Any]: Response from the secret manager containing write operation details
"""
pass
@abstractmethod
async def async_delete_secret(
self,
secret_name: str,
recovery_window_in_days: Optional[int] = 7,
optional_params: Optional[dict] = None,
timeout: Optional[Union[float, httpx.Timeout]] = None,
) -> dict:
"""
Async function to delete a secret from the secret manager
Args:
secret_name: Name of the secret to delete
recovery_window_in_days: Number of days before permanent deletion (default: 7)
optional_params: Additional parameters specific to the secret manager
timeout: Request timeout
Returns:
dict: Response from the secret manager containing deletion details
"""
pass
async def async_rotate_secret(
self,
current_secret_name: str,
new_secret_name: str,
new_secret_value: str,
optional_params: Optional[dict] = None,
timeout: Optional[Union[float, httpx.Timeout]] = None,
) -> dict:
"""
Async function to rotate a secret by creating a new one and deleting the old one.
This allows for both value and name changes during rotation.
Args:
current_secret_name: Current name of the secret
new_secret_name: New name for the secret
new_secret_value: New value for the secret
optional_params: Additional AWS parameters
timeout: Request timeout
Returns:
dict: Response containing the new secret details
Raises:
ValueError: If the secret doesn't exist or if there's an HTTP error
"""
try:
# First verify the old secret exists
old_secret = await self.async_read_secret(
secret_name=current_secret_name,
optional_params=optional_params,
timeout=timeout,
)
if old_secret is None:
raise ValueError(f"Current secret {current_secret_name} not found")
# Create new secret with new name and value
create_response = await self.async_write_secret(
secret_name=new_secret_name,
secret_value=new_secret_value,
description=f"Rotated from {current_secret_name}",
optional_params=optional_params,
timeout=timeout,
)
# Verify new secret was created successfully
new_secret = await self.async_read_secret(
secret_name=new_secret_name,
optional_params=optional_params,
timeout=timeout,
)
if new_secret is None:
raise ValueError(f"Failed to verify new secret {new_secret_name}")
# If everything is successful, delete the old secret
await self.async_delete_secret(
secret_name=current_secret_name,
recovery_window_in_days=7, # Keep for recovery if needed
optional_params=optional_params,
timeout=timeout,
)
return create_response
except httpx.HTTPStatusError as err:
verbose_logger.exception(
"Error rotating secret in AWS Secrets Manager: %s",
str(err.response.text),
)
raise ValueError(f"HTTP error occurred: {err.response.text}")
except httpx.TimeoutException:
raise ValueError("Timeout error occurred")
except Exception as e:
verbose_logger.exception(
"Error rotating secret in AWS Secrets Manager: %s", str(e)
)
raise
|