from base64 import b64decode, b64encode from http import HTTPStatus from uuid import UUID from kubernetes import client, config from kubernetes.client.rest import ApiException from loguru import logger class KubernetesSecretManager: """A class for managing Kubernetes secrets.""" def __init__(self, namespace: str = "langflow"): """Initialize the KubernetesSecretManager class. Args: namespace (str): The namespace in which to perform secret operations. """ config.load_kube_config() self.namespace = namespace # initialize the Kubernetes API client self.core_api = client.CoreV1Api() def create_secret( self, name: str, data: dict, secret_type: str = "Opaque", # noqa: S107 ): """Create a new secret in the specified namespace. Args: name (str): The name of the secret to create. data (dict): A dictionary containing the key-value pairs for the secret data. secret_type (str, optional): The type of secret to create. Defaults to 'Opaque'. Returns: V1Secret: The created secret object. """ encoded_data = {k: b64encode(v.encode()).decode() for k, v in data.items()} secret_metadata = client.V1ObjectMeta(name=name) secret = client.V1Secret( api_version="v1", kind="Secret", metadata=secret_metadata, type=secret_type, data=encoded_data ) return self.core_api.create_namespaced_secret(self.namespace, secret) def upsert_secret(self, secret_name: str, data: dict): """Upsert a secret in the specified namespace. If the secret doesn't exist, it will be created. If it exists, it will be updated with new data while preserving existing keys. :param secret_name: Name of the secret :param new_data: Dictionary containing new key-value pairs for the secret :return: Created or updated secret object """ try: # Try to read the existing secret existing_secret = self.core_api.read_namespaced_secret(secret_name, self.namespace) # If secret exists, update it existing_data = {k: b64decode(v).decode() for k, v in existing_secret.data.items()} existing_data.update(data) # Encode all data to base64 encoded_data = {k: b64encode(v.encode()).decode() for k, v in existing_data.items()} # Update the existing secret existing_secret.data = encoded_data return self.core_api.replace_namespaced_secret(secret_name, self.namespace, existing_secret) except ApiException as e: if e.status == HTTPStatus.NOT_FOUND: # Secret doesn't exist, create a new one return self.create_secret(secret_name, data) logger.exception(f"Error upserting secret {secret_name}") raise def get_secret(self, name: str) -> dict | None: """Read a secret from the specified namespace. Args: name (str): The name of the secret to read. Returns: V1Secret: The secret object. """ try: secret = self.core_api.read_namespaced_secret(name, self.namespace) return {k: b64decode(v).decode() for k, v in secret.data.items()} except ApiException as e: if e.status == HTTPStatus.NOT_FOUND: return None raise def update_secret(self, name: str, data: dict): """Update an existing secret in the specified namespace. Args: name (str): The name of the secret to update. data (dict): A dictionary containing the key-value pairs for the updated secret data. Returns: V1Secret: The updated secret object. """ # Get the existing secret secret = self.core_api.read_namespaced_secret(name, self.namespace) if secret is None: raise ApiException(status=404, reason="Not Found", msg="Secret not found") # Update the secret data encoded_data = {k: b64encode(v.encode()).decode() for k, v in data.items()} secret.data.update(encoded_data) # Update the secret in Kubernetes return self.core_api.replace_namespaced_secret(name, self.namespace, secret) def delete_secret_key(self, name: str, key: str): """Delete a key from the specified secret in the namespace. Args: name (str): The name of the secret. key (str): The key to delete from the secret. Returns: V1Secret: The updated secret object. """ # Get the existing secret secret = self.core_api.read_namespaced_secret(name, self.namespace) if secret is None: raise ApiException(status=404, reason="Not Found", msg="Secret not found") # Delete the key from the secret data if key in secret.data: del secret.data[key] else: raise ApiException(status=404, reason="Not Found", msg="Key not found in the secret") # Update the secret in Kubernetes return self.core_api.replace_namespaced_secret(name, self.namespace, secret) def delete_secret(self, name: str): """Delete a secret from the specified namespace. Args: name (str): The name of the secret to delete. Returns: V1Status: The status object indicating the success or failure of the operation. """ return self.core_api.delete_namespaced_secret(name, self.namespace) # utility function to encode user_id to base64 lower case and numbers only # this is required by kubernetes secret name restrictions def encode_user_id(user_id: UUID | str) -> str: # Handle UUID if isinstance(user_id, UUID): return f"uuid-{str(user_id).lower()}"[:253] # Convert string to lowercase user_id_ = str(user_id).lower() # If the user_id looks like an email, replace @ and . with allowed characters if "@" in user_id_ or "." in user_id_: user_id_ = user_id_.replace("@", "-at-").replace(".", "-dot-") # Encode the user_id to base64 # encoded = base64.b64encode(user_id.encode("utf-8")).decode("utf-8") # Replace characters not allowed in Kubernetes names user_id_ = user_id_.replace("+", "-").replace("/", "_").rstrip("=") # Ensure the name starts with an alphanumeric character if not user_id_[0].isalnum(): user_id_ = "a-" + user_id_ # Truncate to 253 characters (Kubernetes name length limit) user_id_ = user_id_[:253] if not all(c.isalnum() or c in "-_" for c in user_id_): msg = f"Invalid user_id: {user_id_}" raise ValueError(msg) # Ensure the name ends with an alphanumeric character while not user_id_[-1].isalnum(): user_id_ = user_id_[:-1] return user_id_