Spaces:
Running
Running
from base64 import b64decode, b64encode | |
from http import HTTPStatus | |
from uuid import UUID | |
from kubernetes import client, config | |
from kubernetes.client.rest import ApiException | |
from loguru import logger | |
class KubernetesSecretManager: | |
"""A class for managing Kubernetes secrets.""" | |
def __init__(self, namespace: str = "langflow"): | |
"""Initialize the KubernetesSecretManager class. | |
Args: | |
namespace (str): The namespace in which to perform secret operations. | |
""" | |
config.load_kube_config() | |
self.namespace = namespace | |
# initialize the Kubernetes API client | |
self.core_api = client.CoreV1Api() | |
def create_secret( | |
self, | |
name: str, | |
data: dict, | |
secret_type: str = "Opaque", # noqa: S107 | |
): | |
"""Create a new secret in the specified namespace. | |
Args: | |
name (str): The name of the secret to create. | |
data (dict): A dictionary containing the key-value pairs for the secret data. | |
secret_type (str, optional): The type of secret to create. Defaults to 'Opaque'. | |
Returns: | |
V1Secret: The created secret object. | |
""" | |
encoded_data = {k: b64encode(v.encode()).decode() for k, v in data.items()} | |
secret_metadata = client.V1ObjectMeta(name=name) | |
secret = client.V1Secret( | |
api_version="v1", kind="Secret", metadata=secret_metadata, type=secret_type, data=encoded_data | |
) | |
return self.core_api.create_namespaced_secret(self.namespace, secret) | |
def upsert_secret(self, secret_name: str, data: dict): | |
"""Upsert a secret in the specified namespace. | |
If the secret doesn't exist, it will be created. | |
If it exists, it will be updated with new data while preserving existing keys. | |
:param secret_name: Name of the secret | |
:param new_data: Dictionary containing new key-value pairs for the secret | |
:return: Created or updated secret object | |
""" | |
try: | |
# Try to read the existing secret | |
existing_secret = self.core_api.read_namespaced_secret(secret_name, self.namespace) | |
# If secret exists, update it | |
existing_data = {k: b64decode(v).decode() for k, v in existing_secret.data.items()} | |
existing_data.update(data) | |
# Encode all data to base64 | |
encoded_data = {k: b64encode(v.encode()).decode() for k, v in existing_data.items()} | |
# Update the existing secret | |
existing_secret.data = encoded_data | |
return self.core_api.replace_namespaced_secret(secret_name, self.namespace, existing_secret) | |
except ApiException as e: | |
if e.status == HTTPStatus.NOT_FOUND: | |
# Secret doesn't exist, create a new one | |
return self.create_secret(secret_name, data) | |
logger.exception(f"Error upserting secret {secret_name}") | |
raise | |
def get_secret(self, name: str) -> dict | None: | |
"""Read a secret from the specified namespace. | |
Args: | |
name (str): The name of the secret to read. | |
Returns: | |
V1Secret: The secret object. | |
""" | |
try: | |
secret = self.core_api.read_namespaced_secret(name, self.namespace) | |
return {k: b64decode(v).decode() for k, v in secret.data.items()} | |
except ApiException as e: | |
if e.status == HTTPStatus.NOT_FOUND: | |
return None | |
raise | |
def update_secret(self, name: str, data: dict): | |
"""Update an existing secret in the specified namespace. | |
Args: | |
name (str): The name of the secret to update. | |
data (dict): A dictionary containing the key-value pairs for the updated secret data. | |
Returns: | |
V1Secret: The updated secret object. | |
""" | |
# Get the existing secret | |
secret = self.core_api.read_namespaced_secret(name, self.namespace) | |
if secret is None: | |
raise ApiException(status=404, reason="Not Found", msg="Secret not found") | |
# Update the secret data | |
encoded_data = {k: b64encode(v.encode()).decode() for k, v in data.items()} | |
secret.data.update(encoded_data) | |
# Update the secret in Kubernetes | |
return self.core_api.replace_namespaced_secret(name, self.namespace, secret) | |
def delete_secret_key(self, name: str, key: str): | |
"""Delete a key from the specified secret in the namespace. | |
Args: | |
name (str): The name of the secret. | |
key (str): The key to delete from the secret. | |
Returns: | |
V1Secret: The updated secret object. | |
""" | |
# Get the existing secret | |
secret = self.core_api.read_namespaced_secret(name, self.namespace) | |
if secret is None: | |
raise ApiException(status=404, reason="Not Found", msg="Secret not found") | |
# Delete the key from the secret data | |
if key in secret.data: | |
del secret.data[key] | |
else: | |
raise ApiException(status=404, reason="Not Found", msg="Key not found in the secret") | |
# Update the secret in Kubernetes | |
return self.core_api.replace_namespaced_secret(name, self.namespace, secret) | |
def delete_secret(self, name: str): | |
"""Delete a secret from the specified namespace. | |
Args: | |
name (str): The name of the secret to delete. | |
Returns: | |
V1Status: The status object indicating the success or failure of the operation. | |
""" | |
return self.core_api.delete_namespaced_secret(name, self.namespace) | |
# utility function to encode user_id to base64 lower case and numbers only | |
# this is required by kubernetes secret name restrictions | |
def encode_user_id(user_id: UUID | str) -> str: | |
# Handle UUID | |
if isinstance(user_id, UUID): | |
return f"uuid-{str(user_id).lower()}"[:253] | |
# Convert string to lowercase | |
user_id_ = str(user_id).lower() | |
# If the user_id looks like an email, replace @ and . with allowed characters | |
if "@" in user_id_ or "." in user_id_: | |
user_id_ = user_id_.replace("@", "-at-").replace(".", "-dot-") | |
# Encode the user_id to base64 | |
# encoded = base64.b64encode(user_id.encode("utf-8")).decode("utf-8") | |
# Replace characters not allowed in Kubernetes names | |
user_id_ = user_id_.replace("+", "-").replace("/", "_").rstrip("=") | |
# Ensure the name starts with an alphanumeric character | |
if not user_id_[0].isalnum(): | |
user_id_ = "a-" + user_id_ | |
# Truncate to 253 characters (Kubernetes name length limit) | |
user_id_ = user_id_[:253] | |
if not all(c.isalnum() or c in "-_" for c in user_id_): | |
msg = f"Invalid user_id: {user_id_}" | |
raise ValueError(msg) | |
# Ensure the name ends with an alphanumeric character | |
while not user_id_[-1].isalnum(): | |
user_id_ = user_id_[:-1] | |
return user_id_ | |