Tai Truong
fix readme
d202ada
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING
from loguru import logger
from sqlalchemy import delete
from sqlalchemy import exc as sqlalchemy_exc
from sqlmodel import col, select
from langflow.services.auth.utils import create_super_user, verify_password
from langflow.services.cache.factory import CacheServiceFactory
from langflow.services.database.models.transactions.model import TransactionTable
from langflow.services.database.models.vertex_builds.model import VertexBuildTable
from langflow.services.database.utils import initialize_database
from langflow.services.schema import ServiceType
from langflow.services.settings.constants import DEFAULT_SUPERUSER, DEFAULT_SUPERUSER_PASSWORD
from .deps import get_db_service, get_service, get_settings_service
if TYPE_CHECKING:
from sqlmodel.ext.asyncio.session import AsyncSession
from langflow.services.settings.manager import SettingsService
async def get_or_create_super_user(session: AsyncSession, username, password, is_default):
from langflow.services.database.models.user.model import User
stmt = select(User).where(User.username == username)
user = (await session.exec(stmt)).first()
if user and user.is_superuser:
return None # Superuser already exists
if user and is_default:
if user.is_superuser:
if verify_password(password, user.password):
return None
# Superuser exists but password is incorrect
# which means that the user has changed the
# base superuser credentials.
# This means that the user has already created
# a superuser and changed the password in the UI
# so we don't need to do anything.
logger.debug(
"Superuser exists but password is incorrect. "
"This means that the user has changed the "
"base superuser credentials."
)
return None
logger.debug("User with superuser credentials exists but is not a superuser.")
return None
if user:
if verify_password(password, user.password):
msg = "User with superuser credentials exists but is not a superuser."
raise ValueError(msg)
msg = "Incorrect superuser credentials"
raise ValueError(msg)
if is_default:
logger.debug("Creating default superuser.")
else:
logger.debug("Creating superuser.")
try:
return await create_super_user(username, password, db=session)
except Exception as exc: # noqa: BLE001
if "UNIQUE constraint failed: user.username" in str(exc):
# This is to deal with workers running this
# at startup and trying to create the superuser
# at the same time.
logger.opt(exception=True).debug("Superuser already exists.")
return None
logger.opt(exception=True).debug("Error creating superuser.")
async def setup_superuser(settings_service, session: AsyncSession) -> None:
if settings_service.auth_settings.AUTO_LOGIN:
logger.debug("AUTO_LOGIN is set to True. Creating default superuser.")
else:
# Remove the default superuser if it exists
await teardown_superuser(settings_service, session)
username = settings_service.auth_settings.SUPERUSER
password = settings_service.auth_settings.SUPERUSER_PASSWORD
is_default = (username == DEFAULT_SUPERUSER) and (password == DEFAULT_SUPERUSER_PASSWORD)
try:
user = await get_or_create_super_user(
session=session, username=username, password=password, is_default=is_default
)
if user is not None:
logger.debug("Superuser created successfully.")
except Exception as exc:
logger.exception(exc)
msg = "Could not create superuser. Please create a superuser manually."
raise RuntimeError(msg) from exc
finally:
settings_service.auth_settings.reset_credentials()
async def teardown_superuser(settings_service, session: AsyncSession) -> None:
"""Teardown the superuser."""
# If AUTO_LOGIN is True, we will remove the default superuser
# from the database.
if not settings_service.auth_settings.AUTO_LOGIN:
try:
logger.debug("AUTO_LOGIN is set to False. Removing default superuser if exists.")
username = DEFAULT_SUPERUSER
from langflow.services.database.models.user.model import User
stmt = select(User).where(User.username == username)
user = (await session.exec(stmt)).first()
# Check if super was ever logged in, if not delete it
# if it has logged in, it means the user is using it to login
if user and user.is_superuser is True and not user.last_login_at:
await session.delete(user)
await session.commit()
logger.debug("Default superuser removed successfully.")
except Exception as exc:
logger.exception(exc)
await session.rollback()
msg = "Could not remove default superuser."
raise RuntimeError(msg) from exc
async def teardown_services() -> None:
"""Teardown all the services."""
try:
async with get_db_service().with_async_session() as session:
await teardown_superuser(get_settings_service(), session)
except Exception as exc: # noqa: BLE001
logger.exception(exc)
try:
from langflow.services.manager import service_manager
await service_manager.teardown()
except Exception as exc: # noqa: BLE001
logger.exception(exc)
def initialize_settings_service() -> None:
"""Initialize the settings manager."""
from langflow.services.settings import factory as settings_factory
get_service(ServiceType.SETTINGS_SERVICE, settings_factory.SettingsServiceFactory())
def initialize_session_service() -> None:
"""Initialize the session manager."""
from langflow.services.cache import factory as cache_factory
from langflow.services.session import factory as session_service_factory
initialize_settings_service()
get_service(
ServiceType.CACHE_SERVICE,
cache_factory.CacheServiceFactory(),
)
get_service(
ServiceType.SESSION_SERVICE,
session_service_factory.SessionServiceFactory(),
)
async def clean_transactions(settings_service: SettingsService, session: AsyncSession) -> None:
"""Clean up old transactions from the database.
This function deletes transactions that exceed the maximum number to keep (configured in settings).
It orders transactions by timestamp descending and removes the oldest ones beyond the limit.
Args:
settings_service: The settings service containing configuration like max_transactions_to_keep
session: The database session to use for the deletion
Returns:
None
"""
try:
# Delete transactions using bulk delete
delete_stmt = delete(TransactionTable).where(
col(TransactionTable.id).in_(
select(TransactionTable.id)
.order_by(col(TransactionTable.timestamp).desc())
.offset(settings_service.settings.max_transactions_to_keep)
)
)
await session.exec(delete_stmt)
await session.commit()
logger.debug("Successfully cleaned up old transactions")
except (sqlalchemy_exc.SQLAlchemyError, asyncio.TimeoutError) as exc:
logger.error(f"Error cleaning up transactions: {exc!s}")
await session.rollback()
# Don't re-raise since this is a cleanup task
async def clean_vertex_builds(settings_service: SettingsService, session: AsyncSession) -> None:
"""Clean up old vertex builds from the database.
This function deletes vertex builds that exceed the maximum number to keep (configured in settings).
It orders vertex builds by timestamp descending and removes the oldest ones beyond the limit.
Args:
settings_service: The settings service containing configuration like max_vertex_builds_to_keep
session: The database session to use for the deletion
Returns:
None
"""
try:
# Delete vertex builds using bulk delete
delete_stmt = delete(VertexBuildTable).where(
col(VertexBuildTable.id).in_(
select(VertexBuildTable.id)
.order_by(col(VertexBuildTable.timestamp).desc())
.offset(settings_service.settings.max_vertex_builds_to_keep)
)
)
await session.exec(delete_stmt)
await session.commit()
logger.debug("Successfully cleaned up old vertex builds")
except (sqlalchemy_exc.SQLAlchemyError, asyncio.TimeoutError) as exc:
logger.error(f"Error cleaning up vertex builds: {exc!s}")
await session.rollback()
# Don't re-raise since this is a cleanup task
async def initialize_services(*, fix_migration: bool = False) -> None:
"""Initialize all the services needed."""
# Test cache connection
get_service(ServiceType.CACHE_SERVICE, default=CacheServiceFactory())
# Setup the superuser
await asyncio.to_thread(initialize_database, fix_migration=fix_migration)
async with get_db_service().with_async_session() as session:
settings_service = get_service(ServiceType.SETTINGS_SERVICE)
await setup_superuser(settings_service, session)
try:
await get_db_service().assign_orphaned_flows_to_superuser()
except sqlalchemy_exc.IntegrityError as exc:
logger.warning(f"Error assigning orphaned flows to the superuser: {exc!s}")
await clean_transactions(settings_service, session)
await clean_vertex_builds(settings_service, session)