Spaces:
Running
Running
import secrets | |
from pathlib import Path | |
from typing import Literal | |
from loguru import logger | |
from passlib.context import CryptContext | |
from pydantic import Field, SecretStr, field_validator | |
from pydantic_settings import BaseSettings | |
from langflow.services.settings.constants import DEFAULT_SUPERUSER, DEFAULT_SUPERUSER_PASSWORD | |
from langflow.services.settings.utils import read_secret_from_file, write_secret_to_file | |
class AuthSettings(BaseSettings): | |
# Login settings | |
CONFIG_DIR: str | |
SECRET_KEY: SecretStr = Field( | |
default=SecretStr(""), | |
description="Secret key for JWT. If not provided, a random one will be generated.", | |
frozen=False, | |
) | |
ALGORITHM: str = "HS256" | |
ACCESS_TOKEN_EXPIRE_SECONDS: int = 60 * 60 # 1 hour | |
REFRESH_TOKEN_EXPIRE_SECONDS: int = 60 * 60 * 24 * 7 # 7 days | |
# API Key to execute /process endpoint | |
API_KEY_ALGORITHM: str = "HS256" | |
API_V1_STR: str = "/api/v1" | |
# If AUTO_LOGIN = True | |
# > The application does not request login and logs in automatically as a super user. | |
AUTO_LOGIN: bool = True | |
NEW_USER_IS_ACTIVE: bool = False | |
SUPERUSER: str = DEFAULT_SUPERUSER | |
SUPERUSER_PASSWORD: str = DEFAULT_SUPERUSER_PASSWORD | |
REFRESH_SAME_SITE: Literal["lax", "strict", "none"] = "none" | |
"""The SameSite attribute of the refresh token cookie.""" | |
REFRESH_SECURE: bool = True | |
"""The Secure attribute of the refresh token cookie.""" | |
REFRESH_HTTPONLY: bool = True | |
"""The HttpOnly attribute of the refresh token cookie.""" | |
ACCESS_SAME_SITE: Literal["lax", "strict", "none"] = "lax" | |
"""The SameSite attribute of the access token cookie.""" | |
ACCESS_SECURE: bool = False | |
"""The Secure attribute of the access token cookie.""" | |
ACCESS_HTTPONLY: bool = False | |
"""The HttpOnly attribute of the access token cookie.""" | |
COOKIE_DOMAIN: str | None = None | |
"""The domain attribute of the cookies. If None, the domain is not set.""" | |
pwd_context: CryptContext = CryptContext(schemes=["bcrypt"], deprecated="auto") | |
class Config: | |
validate_assignment = True | |
extra = "ignore" | |
env_prefix = "LANGFLOW_" | |
def reset_credentials(self) -> None: | |
self.SUPERUSER = DEFAULT_SUPERUSER | |
self.SUPERUSER_PASSWORD = DEFAULT_SUPERUSER_PASSWORD | |
# If autologin is true, then we need to set the credentials to | |
# the default values | |
# so we need to validate the superuser and superuser_password | |
# fields | |
def validate_superuser(cls, value, info): | |
if info.data.get("AUTO_LOGIN"): | |
if value != DEFAULT_SUPERUSER: | |
value = DEFAULT_SUPERUSER | |
logger.debug("Resetting superuser to default value") | |
if info.data.get("SUPERUSER_PASSWORD") != DEFAULT_SUPERUSER_PASSWORD: | |
info.data["SUPERUSER_PASSWORD"] = DEFAULT_SUPERUSER_PASSWORD | |
logger.debug("Resetting superuser password to default value") | |
return value | |
return value | |
def get_secret_key(cls, value, info): | |
config_dir = info.data.get("CONFIG_DIR") | |
if not config_dir: | |
logger.debug("No CONFIG_DIR provided, not saving secret key") | |
return value or secrets.token_urlsafe(32) | |
secret_key_path = Path(config_dir) / "secret_key" | |
if value: | |
logger.debug("Secret key provided") | |
secret_value = value.get_secret_value() if isinstance(value, SecretStr) else value | |
write_secret_to_file(secret_key_path, secret_value) | |
else: | |
logger.debug("No secret key provided, generating a random one") | |
if secret_key_path.exists(): | |
value = read_secret_from_file(secret_key_path) | |
logger.debug("Loaded secret key") | |
if not value: | |
value = secrets.token_urlsafe(32) | |
write_secret_to_file(secret_key_path, value) | |
logger.debug("Saved secret key") | |
else: | |
value = secrets.token_urlsafe(32) | |
write_secret_to_file(secret_key_path, value) | |
logger.debug("Saved secret key") | |
return value if isinstance(value, SecretStr) else SecretStr(value).get_secret_value() | |