import asyncio import inspect import platform import signal import socket import sys import time import warnings from pathlib import Path import click import httpx import typer from dotenv import load_dotenv from httpx import HTTPError from multiprocess import cpu_count from multiprocess.context import Process from packaging import version as pkg_version from rich import box from rich import print as rprint from rich.console import Console from rich.panel import Panel from rich.table import Table from sqlmodel import select from langflow.logging.logger import configure, logger from langflow.main import setup_app from langflow.services.database.models.folder.utils import create_default_folder_if_it_doesnt_exist from langflow.services.database.utils import async_session_getter from langflow.services.deps import async_session_scope, get_db_service, get_settings_service from langflow.services.settings.constants import DEFAULT_SUPERUSER from langflow.services.utils import initialize_services from langflow.utils.version import fetch_latest_version, get_version_info from langflow.utils.version import is_pre_release as langflow_is_pre_release console = Console() app = typer.Typer(no_args_is_help=True) def get_number_of_workers(workers=None): if workers == -1 or workers is None: workers = (cpu_count() * 2) + 1 logger.debug(f"Number of workers: {workers}") return workers def display_results(results) -> None: """Display the results of the migration.""" for table_results in results: table = Table(title=f"Migration {table_results.table_name}") table.add_column("Name") table.add_column("Type") table.add_column("Status") for result in table_results.results: status = "Success" if result.success else "Failure" color = "green" if result.success else "red" table.add_row(result.name, result.type, f"[{color}]{status}[/{color}]") console.print(table) console.print() # Print a new line def set_var_for_macos_issue() -> None: # OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES # we need to set this var is we are running on MacOS # otherwise we get an error when running gunicorn if platform.system() == "Darwin": import os os.environ["OBJC_DISABLE_INITIALIZE_FORK_SAFETY"] = "YES" # https://stackoverflow.com/questions/75747888/uwsgi-segmentation-fault-with-flask-python-app-behind-nginx-after-running-for-2 # noqa: E501 os.environ["no_proxy"] = "*" # to avoid error with gunicorn logger.debug("Set OBJC_DISABLE_INITIALIZE_FORK_SAFETY to YES to avoid error") def handle_sigterm(signum, frame): # noqa: ARG001 """Handle SIGTERM signal gracefully.""" logger.info("Received SIGTERM signal. Performing graceful shutdown...") # Raise SystemExit to trigger graceful shutdown sys.exit(0) @app.command() def run( *, host: str | None = typer.Option(None, help="Host to bind the server to.", show_default=False), workers: int | None = typer.Option(None, help="Number of worker processes.", show_default=False), worker_timeout: int | None = typer.Option(None, help="Worker timeout in seconds.", show_default=False), port: int | None = typer.Option(None, help="Port to listen on.", show_default=False), components_path: Path | None = typer.Option( Path(__file__).parent / "components", help="Path to the directory containing custom components.", show_default=False, ), # .env file param env_file: Path | None = typer.Option( None, help="Path to the .env file containing environment variables.", show_default=False, ), log_level: str | None = typer.Option(None, help="Logging level.", show_default=False), log_file: Path | None = typer.Option(None, help="Path to the log file.", show_default=False), cache: str | None = typer.Option( # noqa: ARG001 None, help="Type of cache to use. (InMemoryCache, SQLiteCache)", show_default=False, ), dev: bool | None = typer.Option(None, help="Run in development mode (may contain bugs)", show_default=False), # noqa: ARG001 frontend_path: str | None = typer.Option( None, help="Path to the frontend directory containing build files. This is for development purposes only.", show_default=False, ), open_browser: bool | None = typer.Option( None, help="Open the browser after starting the server.", show_default=False, ), remove_api_keys: bool | None = typer.Option( # noqa: ARG001 None, help="Remove API keys from the projects saved in the database.", show_default=False, ), backend_only: bool | None = typer.Option( None, help="Run only the backend server without the frontend.", show_default=False, ), store: bool | None = typer.Option( # noqa: ARG001 None, help="Enables the store features.", show_default=False, ), auto_saving: bool | None = typer.Option( # noqa: ARG001 None, help="Defines if the auto save is enabled.", show_default=False, ), auto_saving_interval: int | None = typer.Option( # noqa: ARG001 None, help="Defines the debounce time for the auto save.", show_default=False, ), health_check_max_retries: bool | None = typer.Option( # noqa: ARG001 None, help="Defines the number of retries for the health check.", show_default=False, ), max_file_size_upload: int | None = typer.Option( # noqa: ARG001 None, help="Defines the maximum file size for the upload in MB.", show_default=False, ), ) -> None: """Run Langflow.""" # Register SIGTERM handler signal.signal(signal.SIGTERM, handle_sigterm) if env_file: load_dotenv(env_file, override=True) configure(log_level=log_level, log_file=log_file) logger.debug(f"Loading config from file: '{env_file}'" if env_file else "No env_file provided.") set_var_for_macos_issue() settings_service = get_settings_service() frame = inspect.currentframe() valid_args: list = [] values: dict = {} if frame is not None: arguments, _, _, values = inspect.getargvalues(frame) valid_args = [arg for arg in arguments if values[arg] is not None] for arg in valid_args: if arg == "components_path": settings_service.settings.update_settings(components_path=components_path) elif hasattr(settings_service.settings, arg): settings_service.set(arg, values[arg]) logger.debug(f"Loading config from cli parameter '{arg}': '{values[arg]}'") host = settings_service.settings.host port = settings_service.settings.port workers = settings_service.settings.workers worker_timeout = settings_service.settings.worker_timeout log_level = settings_service.settings.log_level frontend_path = settings_service.settings.frontend_path backend_only = settings_service.settings.backend_only # create path object if frontend_path is provided static_files_dir: Path | None = Path(frontend_path) if frontend_path else None app = setup_app(static_files_dir=static_files_dir, backend_only=backend_only) # check if port is being used if is_port_in_use(port, host): port = get_free_port(port) options = { "bind": f"{host}:{port}", "workers": get_number_of_workers(workers), "timeout": worker_timeout, } # Define an env variable to know if we are just testing the server if "pytest" in sys.modules: return process: Process | None = None try: if platform.system() == "Windows": # Run using uvicorn on MacOS and Windows # Windows doesn't support gunicorn # MacOS requires an env variable to be set to use gunicorn run_on_windows(host, port, log_level, options, app) else: # Run using gunicorn on Linux process = run_on_mac_or_linux(host, port, log_level, options, app) if open_browser and not backend_only: click.launch(f"http://{host}:{port}") if process: process.join() except (KeyboardInterrupt, SystemExit) as e: logger.info("Shutting down server...") if process is not None: process.terminate() process.join(timeout=15) # Wait up to 15 seconds for process to terminate if process.is_alive(): logger.warning("Process did not terminate gracefully, forcing...") process.kill() raise typer.Exit(0) from e except Exception as e: logger.exception(e) if process is not None: process.terminate() raise typer.Exit(1) from e def wait_for_server_ready(host, port) -> None: """Wait for the server to become ready by polling the health endpoint.""" status_code = 0 while status_code != httpx.codes.OK: try: status_code = httpx.get(f"http://{host}:{port}/health").status_code except HTTPError: time.sleep(1) except Exception: # noqa: BLE001 logger.opt(exception=True).debug("Error while waiting for the server to become ready.") time.sleep(1) def run_on_mac_or_linux(host, port, log_level, options, app): webapp_process = Process(target=run_langflow, args=(host, port, log_level, options, app)) webapp_process.start() wait_for_server_ready(host, port) print_banner(host, port) return webapp_process def run_on_windows(host, port, log_level, options, app) -> None: """Run the Langflow server on Windows.""" print_banner(host, port) run_langflow(host, port, log_level, options, app) def is_port_in_use(port, host="localhost"): """Check if a port is in use. Args: port (int): The port number to check. host (str): The host to check the port on. Defaults to 'localhost'. Returns: bool: True if the port is in use, False otherwise. """ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: return s.connect_ex((host, port)) == 0 def get_free_port(port): """Given a used port, find a free port. Args: port (int): The port number to check. Returns: int: A free port number. """ while is_port_in_use(port): port += 1 return port def get_letter_from_version(version: str) -> str | None: """Get the letter from a pre-release version.""" if "a" in version: return "a" if "b" in version: return "b" if "rc" in version: return "rc" return None def build_version_notice(current_version: str, package_name: str) -> str: latest_version = fetch_latest_version(package_name, include_prerelease=langflow_is_pre_release(current_version)) if latest_version and pkg_version.parse(current_version) < pkg_version.parse(latest_version): release_type = "pre-release" if langflow_is_pre_release(latest_version) else "version" return f"A new {release_type} of {package_name} is available: {latest_version}" return "" def generate_pip_command(package_names, is_pre_release) -> str: """Generate the pip install command based on the packages and whether it's a pre-release.""" base_command = "pip install" if is_pre_release: return f"{base_command} {' '.join(package_names)} -U --pre" return f"{base_command} {' '.join(package_names)} -U" def stylize_text(text: str, to_style: str, *, is_prerelease: bool) -> str: color = "#42a7f5" if is_prerelease else "#6e42f5" # return "".join(f"[{color}]{char}[/]" for char in text) styled_text = f"[{color}]{to_style}[/]" return text.replace(to_style, styled_text) def print_banner(host: str, port: int) -> None: notices = [] package_names = [] # Track package names for pip install instructions is_pre_release = False # Track if any package is a pre-release package_name = "" # Use langflow.utils.version to get the version info version_info = get_version_info() langflow_version = version_info["version"] package_name = version_info["package"] is_pre_release |= langflow_is_pre_release(langflow_version) # Update pre-release status notice = build_version_notice(langflow_version, package_name) notice = stylize_text(notice, package_name, is_prerelease=is_pre_release) if notice: notices.append(notice) package_names.append(package_name) # Generate pip command based on the collected data pip_command = generate_pip_command(package_names, is_pre_release) # Add pip install command to notices if any package needs an update if notices: notices.append(f"Run '{pip_command}' to update.") styled_notices = [f"[bold]{notice}[/bold]" for notice in notices if notice] styled_package_name = stylize_text( package_name, package_name, is_prerelease=any("pre-release" in notice for notice in notices) ) title = f"[bold]Welcome to :chains: {styled_package_name}[/bold]\n" info_text = ( "Collaborate, and contribute at our " "[bold][link=https://github.com/langflow-ai/langflow]GitHub Repo[/link][/bold] :star2:" ) telemetry_text = ( "We collect anonymous usage data to improve Langflow.\n" "You can opt-out by setting [bold]DO_NOT_TRACK=true[/bold] in your environment." ) access_link = f"Access [link=http://{host}:{port}]http://{host}:{port}[/link]" panel_content = "\n\n".join([title, *styled_notices, info_text, telemetry_text, access_link]) panel = Panel(panel_content, box=box.ROUNDED, border_style="blue", expand=False) rprint(panel) def run_langflow(host, port, log_level, options, app) -> None: """Run Langflow server on localhost.""" if platform.system() == "Windows": import uvicorn uvicorn.run( app, host=host, port=port, log_level=log_level.lower(), loop="asyncio", ) else: from langflow.server import LangflowApplication server = LangflowApplication(app, options) def graceful_shutdown(signum, frame): # noqa: ARG001 """Gracefully shutdown the server when receiving SIGTERM.""" # Suppress click exceptions during shutdown import click click.echo = lambda *args, **kwargs: None # noqa: ARG005 logger.info("Gracefully shutting down server...") # For Gunicorn workers, we raise SystemExit to trigger graceful shutdown raise SystemExit(0) # Register signal handlers signal.signal(signal.SIGTERM, graceful_shutdown) signal.signal(signal.SIGINT, graceful_shutdown) try: server.run() except (KeyboardInterrupt, SystemExit): # Suppress the exception output sys.exit(0) @app.command() def superuser( username: str = typer.Option(..., prompt=True, help="Username for the superuser."), password: str = typer.Option(..., prompt=True, hide_input=True, help="Password for the superuser."), log_level: str = typer.Option("error", help="Logging level.", envvar="LANGFLOW_LOG_LEVEL"), ) -> None: """Create a superuser.""" configure(log_level=log_level) db_service = get_db_service() async def _create_superuser(): await initialize_services() async with async_session_getter(db_service) as session: from langflow.services.auth.utils import create_super_user if await create_super_user(db=session, username=username, password=password): # Verify that the superuser was created from langflow.services.database.models.user.model import User stmt = select(User).where(User.username == username) user: User = (await session.exec(stmt)).first() if user is None or not user.is_superuser: typer.echo("Superuser creation failed.") return # Now create the first folder for the user result = await create_default_folder_if_it_doesnt_exist(session, user.id) if result: typer.echo("Default folder created successfully.") else: msg = "Could not create default folder." raise RuntimeError(msg) typer.echo("Superuser created successfully.") else: typer.echo("Superuser creation failed.") asyncio.run(_create_superuser()) # command to copy the langflow database from the cache to the current directory # because now the database is stored per installation @app.command() def copy_db() -> None: """Copy the database files to the current directory. This function copies the 'langflow.db' and 'langflow-pre.db' files from the cache directory to the current directory. If the files exist in the cache directory, they will be copied to the same directory as this script (__main__.py). Returns: None """ import shutil from platformdirs import user_cache_dir cache_dir = Path(user_cache_dir("langflow")) db_path = cache_dir / "langflow.db" pre_db_path = cache_dir / "langflow-pre.db" # It should be copied to the current directory # this file is __main__.py and it should be in the same directory as the database destination_folder = Path(__file__).parent if db_path.exists(): shutil.copy(db_path, destination_folder) typer.echo(f"Database copied to {destination_folder}") else: typer.echo("Database not found in the cache directory.") if pre_db_path.exists(): shutil.copy(pre_db_path, destination_folder) typer.echo(f"Pre-release database copied to {destination_folder}") else: typer.echo("Pre-release database not found in the cache directory.") @app.command() def migration( test: bool = typer.Option(default=True, help="Run migrations in test mode."), # noqa: FBT001 fix: bool = typer.Option( # noqa: FBT001 default=False, help="Fix migrations. This is a destructive operation, and should only be used if you know what you are doing.", ), ) -> None: """Run or test migrations.""" if fix and not typer.confirm( "This will delete all data necessary to fix migrations. Are you sure you want to continue?" ): raise typer.Abort asyncio.run(initialize_services(fix_migration=fix)) db_service = get_db_service() if not test: db_service.run_migrations() results = db_service.run_migrations_test() display_results(results) @app.command() def api_key( log_level: str = typer.Option("error", help="Logging level."), ) -> None: """Creates an API key for the default superuser if AUTO_LOGIN is enabled. Args: log_level (str, optional): Logging level. Defaults to "error". Returns: None """ configure(log_level=log_level) async def aapi_key(): await initialize_services() settings_service = get_settings_service() auth_settings = settings_service.auth_settings if not auth_settings.AUTO_LOGIN: typer.echo("Auto login is disabled. API keys cannot be created through the CLI.") return None async with async_session_scope() as session: from langflow.services.database.models.user.model import User stmt = select(User).where(User.username == DEFAULT_SUPERUSER) superuser = (await session.exec(stmt)).first() if not superuser: typer.echo( "Default superuser not found. This command requires a superuser and AUTO_LOGIN to be enabled." ) return None from langflow.services.database.models.api_key import ApiKey, ApiKeyCreate from langflow.services.database.models.api_key.crud import create_api_key, delete_api_key stmt = select(ApiKey).where(ApiKey.user_id == superuser.id) api_key = (await session.exec(stmt)).first() if api_key: await delete_api_key(session, api_key.id) api_key_create = ApiKeyCreate(name="CLI") unmasked_api_key = await create_api_key(session, api_key_create, user_id=superuser.id) await session.commit() return unmasked_api_key unmasked_api_key = asyncio.run(aapi_key()) # Create a banner to display the API key and tell the user it won't be shown again api_key_banner(unmasked_api_key) def show_version(*, value: bool): if value: default = "DEV" raw_info = get_version_info() version = raw_info.get("version", default) if raw_info else default typer.echo(f"langflow {version}") raise typer.Exit @app.callback() def version_option( *, version: bool = typer.Option( None, "--version", "-v", callback=show_version, is_eager=True, help="Show the version and exit.", ), ): pass def api_key_banner(unmasked_api_key) -> None: is_mac = platform.system() == "Darwin" import pyperclip pyperclip.copy(unmasked_api_key.api_key) panel = Panel( f"[bold]API Key Created Successfully:[/bold]\n\n" f"[bold blue]{unmasked_api_key.api_key}[/bold blue]\n\n" "This is the only time the API key will be displayed. \n" "Make sure to store it in a secure location. \n\n" f"The API key has been copied to your clipboard. [bold]{['Ctrl', 'Cmd'][is_mac]} + V[/bold] to paste it.", box=box.ROUNDED, border_style="blue", expand=False, ) console = Console() console.print(panel) def main() -> None: with warnings.catch_warnings(): warnings.simplefilter("ignore") app() if __name__ == "__main__": try: main() except Exception as e: logger.exception(e) raise typer.Exit(1) from e