Tai Truong
fix readme
d202ada
from base64 import b64decode, b64encode
from http import HTTPStatus
from uuid import UUID
from kubernetes import client, config
from kubernetes.client.rest import ApiException
from loguru import logger
class KubernetesSecretManager:
"""A class for managing Kubernetes secrets."""
def __init__(self, namespace: str = "langflow"):
"""Initialize the KubernetesSecretManager class.
Args:
namespace (str): The namespace in which to perform secret operations.
"""
config.load_kube_config()
self.namespace = namespace
# initialize the Kubernetes API client
self.core_api = client.CoreV1Api()
def create_secret(
self,
name: str,
data: dict,
secret_type: str = "Opaque", # noqa: S107
):
"""Create a new secret in the specified namespace.
Args:
name (str): The name of the secret to create.
data (dict): A dictionary containing the key-value pairs for the secret data.
secret_type (str, optional): The type of secret to create. Defaults to 'Opaque'.
Returns:
V1Secret: The created secret object.
"""
encoded_data = {k: b64encode(v.encode()).decode() for k, v in data.items()}
secret_metadata = client.V1ObjectMeta(name=name)
secret = client.V1Secret(
api_version="v1", kind="Secret", metadata=secret_metadata, type=secret_type, data=encoded_data
)
return self.core_api.create_namespaced_secret(self.namespace, secret)
def upsert_secret(self, secret_name: str, data: dict):
"""Upsert a secret in the specified namespace.
If the secret doesn't exist, it will be created.
If it exists, it will be updated with new data while preserving existing keys.
:param secret_name: Name of the secret
:param new_data: Dictionary containing new key-value pairs for the secret
:return: Created or updated secret object
"""
try:
# Try to read the existing secret
existing_secret = self.core_api.read_namespaced_secret(secret_name, self.namespace)
# If secret exists, update it
existing_data = {k: b64decode(v).decode() for k, v in existing_secret.data.items()}
existing_data.update(data)
# Encode all data to base64
encoded_data = {k: b64encode(v.encode()).decode() for k, v in existing_data.items()}
# Update the existing secret
existing_secret.data = encoded_data
return self.core_api.replace_namespaced_secret(secret_name, self.namespace, existing_secret)
except ApiException as e:
if e.status == HTTPStatus.NOT_FOUND:
# Secret doesn't exist, create a new one
return self.create_secret(secret_name, data)
logger.exception(f"Error upserting secret {secret_name}")
raise
def get_secret(self, name: str) -> dict | None:
"""Read a secret from the specified namespace.
Args:
name (str): The name of the secret to read.
Returns:
V1Secret: The secret object.
"""
try:
secret = self.core_api.read_namespaced_secret(name, self.namespace)
return {k: b64decode(v).decode() for k, v in secret.data.items()}
except ApiException as e:
if e.status == HTTPStatus.NOT_FOUND:
return None
raise
def update_secret(self, name: str, data: dict):
"""Update an existing secret in the specified namespace.
Args:
name (str): The name of the secret to update.
data (dict): A dictionary containing the key-value pairs for the updated secret data.
Returns:
V1Secret: The updated secret object.
"""
# Get the existing secret
secret = self.core_api.read_namespaced_secret(name, self.namespace)
if secret is None:
raise ApiException(status=404, reason="Not Found", msg="Secret not found")
# Update the secret data
encoded_data = {k: b64encode(v.encode()).decode() for k, v in data.items()}
secret.data.update(encoded_data)
# Update the secret in Kubernetes
return self.core_api.replace_namespaced_secret(name, self.namespace, secret)
def delete_secret_key(self, name: str, key: str):
"""Delete a key from the specified secret in the namespace.
Args:
name (str): The name of the secret.
key (str): The key to delete from the secret.
Returns:
V1Secret: The updated secret object.
"""
# Get the existing secret
secret = self.core_api.read_namespaced_secret(name, self.namespace)
if secret is None:
raise ApiException(status=404, reason="Not Found", msg="Secret not found")
# Delete the key from the secret data
if key in secret.data:
del secret.data[key]
else:
raise ApiException(status=404, reason="Not Found", msg="Key not found in the secret")
# Update the secret in Kubernetes
return self.core_api.replace_namespaced_secret(name, self.namespace, secret)
def delete_secret(self, name: str):
"""Delete a secret from the specified namespace.
Args:
name (str): The name of the secret to delete.
Returns:
V1Status: The status object indicating the success or failure of the operation.
"""
return self.core_api.delete_namespaced_secret(name, self.namespace)
# utility function to encode user_id to base64 lower case and numbers only
# this is required by kubernetes secret name restrictions
def encode_user_id(user_id: UUID | str) -> str:
# Handle UUID
if isinstance(user_id, UUID):
return f"uuid-{str(user_id).lower()}"[:253]
# Convert string to lowercase
user_id_ = str(user_id).lower()
# If the user_id looks like an email, replace @ and . with allowed characters
if "@" in user_id_ or "." in user_id_:
user_id_ = user_id_.replace("@", "-at-").replace(".", "-dot-")
# Encode the user_id to base64
# encoded = base64.b64encode(user_id.encode("utf-8")).decode("utf-8")
# Replace characters not allowed in Kubernetes names
user_id_ = user_id_.replace("+", "-").replace("/", "_").rstrip("=")
# Ensure the name starts with an alphanumeric character
if not user_id_[0].isalnum():
user_id_ = "a-" + user_id_
# Truncate to 253 characters (Kubernetes name length limit)
user_id_ = user_id_[:253]
if not all(c.isalnum() or c in "-_" for c in user_id_):
msg = f"Invalid user_id: {user_id_}"
raise ValueError(msg)
# Ensure the name ends with an alphanumeric character
while not user_id_[-1].isalnum():
user_id_ = user_id_[:-1]
return user_id_