Tai Truong
fix readme
d202ada
from __future__ import annotations
import asyncio
import os
from typing import TYPE_CHECKING
from loguru import logger
from typing_extensions import override
from langflow.services.auth import utils as auth_utils
from langflow.services.base import Service
from langflow.services.database.models.variable.model import Variable, VariableCreate
from langflow.services.variable.base import VariableService
from langflow.services.variable.constants import CREDENTIAL_TYPE, GENERIC_TYPE
from langflow.services.variable.kubernetes_secrets import KubernetesSecretManager, encode_user_id
if TYPE_CHECKING:
from uuid import UUID
from sqlmodel import Session
from sqlmodel.ext.asyncio.session import AsyncSession
from langflow.services.settings.service import SettingsService
class KubernetesSecretService(VariableService, Service):
def __init__(self, settings_service: SettingsService):
self.settings_service = settings_service
# TODO: settings_service to set kubernetes namespace
self.kubernetes_secrets = KubernetesSecretManager()
@override
async def initialize_user_variables(self, user_id: UUID | str, session: AsyncSession) -> None:
# Check for environment variables that should be stored in the database
should_or_should_not = "Should" if self.settings_service.settings.store_environment_variables else "Should not"
logger.info(f"{should_or_should_not} store environment variables in the kubernetes.")
if self.settings_service.settings.store_environment_variables:
variables = {}
for var in self.settings_service.settings.variables_to_get_from_environment:
if var in os.environ:
logger.debug(f"Creating {var} variable from environment.")
value = os.environ[var]
if isinstance(value, str):
value = value.strip()
key = CREDENTIAL_TYPE + "_" + var
variables[key] = str(value)
try:
secret_name = encode_user_id(user_id)
await asyncio.to_thread(
self.kubernetes_secrets.create_secret,
name=secret_name,
data=variables,
)
except Exception: # noqa: BLE001
logger.exception(f"Error creating {var} variable")
else:
logger.info("Skipping environment variable storage.")
# resolve_variable is a helper function that resolves the variable name to the actual key in the secret
def resolve_variable(
self,
secret_name: str,
user_id: UUID | str,
name: str,
) -> tuple[str, str]:
variables = self.kubernetes_secrets.get_secret(name=secret_name)
if not variables:
msg = f"user_id {user_id} variable not found."
raise ValueError(msg)
if name in variables:
return name, variables[name]
credential_name = CREDENTIAL_TYPE + "_" + name
if credential_name in variables:
return credential_name, variables[credential_name]
msg = f"user_id {user_id} variable name {name} not found."
raise ValueError(msg)
@override
def get_variable(
self,
user_id: UUID | str,
name: str,
field: str,
session: Session,
) -> str:
secret_name = encode_user_id(user_id)
key, value = self.resolve_variable(secret_name, user_id, name)
if key.startswith(CREDENTIAL_TYPE + "_") and field == "session_id":
msg = (
f"variable {name} of type 'Credential' cannot be used in a Session ID field "
"because its purpose is to prevent the exposure of values."
)
raise TypeError(msg)
return value
@override
def list_variables_sync(
self,
user_id: UUID | str,
session: Session,
) -> list[str | None]:
variables = self.kubernetes_secrets.get_secret(name=encode_user_id(user_id))
if not variables:
return []
names = []
for key in variables:
if key.startswith(CREDENTIAL_TYPE + "_"):
names.append(key[len(CREDENTIAL_TYPE) + 1 :])
else:
names.append(key)
return names
@override
async def list_variables(
self,
user_id: UUID | str,
session: AsyncSession,
) -> list[str | None]:
return await asyncio.to_thread(self.list_variables_sync, user_id, session.sync_session)
def _update_variable(
self,
user_id: UUID | str,
name: str,
value: str,
):
secret_name = encode_user_id(user_id)
secret_key, _ = self.resolve_variable(secret_name, user_id, name)
return self.kubernetes_secrets.update_secret(name=secret_name, data={secret_key: value})
@override
async def update_variable(
self,
user_id: UUID | str,
name: str,
value: str,
session: AsyncSession,
):
return await asyncio.to_thread(self._update_variable, user_id, name, value)
def _delete_variable(self, user_id: UUID | str, name: str) -> None:
secret_name = encode_user_id(user_id)
secret_key, _ = self.resolve_variable(secret_name, user_id, name)
self.kubernetes_secrets.delete_secret_key(name=secret_name, key=secret_key)
@override
async def delete_variable(self, user_id: UUID | str, name: str, session: AsyncSession) -> None:
await asyncio.to_thread(self._delete_variable, user_id, name)
@override
async def delete_variable_by_id(self, user_id: UUID | str, variable_id: UUID | str, session: AsyncSession) -> None:
await self.delete_variable(user_id, str(variable_id), session)
@override
async def create_variable(
self,
user_id: UUID | str,
name: str,
value: str,
*,
default_fields: list[str],
type_: str,
session: AsyncSession,
) -> Variable:
secret_name = encode_user_id(user_id)
secret_key = name
if type_ == CREDENTIAL_TYPE:
secret_key = CREDENTIAL_TYPE + "_" + name
else:
type_ = GENERIC_TYPE
await asyncio.to_thread(
self.kubernetes_secrets.upsert_secret, secret_name=secret_name, data={secret_key: value}
)
variable_base = VariableCreate(
name=name,
type=type_,
value=auth_utils.encrypt_api_key(value, settings_service=self.settings_service),
default_fields=default_fields,
)
return Variable.model_validate(variable_base, from_attributes=True, update={"user_id": user_id})