import asyncio | |
import inspect | |
import platform | |
import signal | |
import socket | |
import sys | |
import time | |
import warnings | |
from pathlib import Path | |
import click | |
import httpx | |
import typer | |
from dotenv import load_dotenv | |
from httpx import HTTPError | |
from multiprocess import cpu_count | |
from multiprocess.context import Process | |
from packaging import version as pkg_version | |
from rich import box | |
from rich import print as rprint | |
from rich.console import Console | |
from rich.panel import Panel | |
from rich.table import Table | |
from sqlmodel import select | |
from langflow.logging.logger import configure, logger | |
from langflow.main import setup_app | |
from import create_default_folder_if_it_doesnt_exist | |
from import async_session_getter | |
from import async_session_scope, get_db_service, get_settings_service | |
from import DEFAULT_SUPERUSER | |
from import initialize_services | |
from langflow.utils.version import fetch_latest_version, get_version_info | |
from langflow.utils.version import is_pre_release as langflow_is_pre_release | |
console = Console() | |
app = typer.Typer(no_args_is_help=True) | |
def get_number_of_workers(workers=None): | |
if workers == -1 or workers is None: | |
workers = (cpu_count() * 2) + 1 | |
logger.debug(f"Number of workers: {workers}") | |
return workers | |
def display_results(results) -> None: | |
"""Display the results of the migration.""" | |
for table_results in results: | |
table = Table(title=f"Migration {table_results.table_name}") | |
table.add_column("Name") | |
table.add_column("Type") | |
table.add_column("Status") | |
for result in table_results.results: | |
status = "Success" if result.success else "Failure" | |
color = "green" if result.success else "red" | |
table.add_row(, result.type, f"[{color}]{status}[/{color}]") | |
console.print(table) | |
console.print() # Print a new line | |
def set_var_for_macos_issue() -> None: | |
# we need to set this var is we are running on MacOS | |
# otherwise we get an error when running gunicorn | |
if platform.system() == "Darwin": | |
import os | |
# # noqa: E501 | |
os.environ["no_proxy"] = "*" # to avoid error with gunicorn | |
logger.debug("Set OBJC_DISABLE_INITIALIZE_FORK_SAFETY to YES to avoid error") | |
def handle_sigterm(signum, frame): # noqa: ARG001 | |
"""Handle SIGTERM signal gracefully.""" | |"Received SIGTERM signal. Performing graceful shutdown...") | |
# Raise SystemExit to trigger graceful shutdown | |
sys.exit(0) | |
def run( | |
*, | |
host: str | None = typer.Option(None, help="Host to bind the server to.", show_default=False), | |
workers: int | None = typer.Option(None, help="Number of worker processes.", show_default=False), | |
worker_timeout: int | None = typer.Option(None, help="Worker timeout in seconds.", show_default=False), | |
port: int | None = typer.Option(None, help="Port to listen on.", show_default=False), | |
components_path: Path | None = typer.Option( | |
Path(__file__).parent / "components", | |
help="Path to the directory containing custom components.", | |
show_default=False, | |
), | |
# .env file param | |
env_file: Path | None = typer.Option( | |
None, | |
help="Path to the .env file containing environment variables.", | |
show_default=False, | |
), | |
log_level: str | None = typer.Option(None, help="Logging level.", show_default=False), | |
log_file: Path | None = typer.Option(None, help="Path to the log file.", show_default=False), | |
cache: str | None = typer.Option( # noqa: ARG001 | |
None, | |
help="Type of cache to use. (InMemoryCache, SQLiteCache)", | |
show_default=False, | |
), | |
dev: bool | None = typer.Option(None, help="Run in development mode (may contain bugs)", show_default=False), # noqa: ARG001 | |
frontend_path: str | None = typer.Option( | |
None, | |
help="Path to the frontend directory containing build files. This is for development purposes only.", | |
show_default=False, | |
), | |
open_browser: bool | None = typer.Option( | |
None, | |
help="Open the browser after starting the server.", | |
show_default=False, | |
), | |
remove_api_keys: bool | None = typer.Option( # noqa: ARG001 | |
None, | |
help="Remove API keys from the projects saved in the database.", | |
show_default=False, | |
), | |
backend_only: bool | None = typer.Option( | |
None, | |
help="Run only the backend server without the frontend.", | |
show_default=False, | |
), | |
store: bool | None = typer.Option( # noqa: ARG001 | |
None, | |
help="Enables the store features.", | |
show_default=False, | |
), | |
auto_saving: bool | None = typer.Option( # noqa: ARG001 | |
None, | |
help="Defines if the auto save is enabled.", | |
show_default=False, | |
), | |
auto_saving_interval: int | None = typer.Option( # noqa: ARG001 | |
None, | |
help="Defines the debounce time for the auto save.", | |
show_default=False, | |
), | |
health_check_max_retries: bool | None = typer.Option( # noqa: ARG001 | |
None, | |
help="Defines the number of retries for the health check.", | |
show_default=False, | |
), | |
max_file_size_upload: int | None = typer.Option( # noqa: ARG001 | |
None, | |
help="Defines the maximum file size for the upload in MB.", | |
show_default=False, | |
), | |
) -> None: | |
"""Run Langflow.""" | |
# Register SIGTERM handler | |
signal.signal(signal.SIGTERM, handle_sigterm) | |
if env_file: | |
load_dotenv(env_file, override=True) | |
configure(log_level=log_level, log_file=log_file) | |
logger.debug(f"Loading config from file: '{env_file}'" if env_file else "No env_file provided.") | |
set_var_for_macos_issue() | |
settings_service = get_settings_service() | |
frame = inspect.currentframe() | |
valid_args: list = [] | |
values: dict = {} | |
if frame is not None: | |
arguments, _, _, values = inspect.getargvalues(frame) | |
valid_args = [arg for arg in arguments if values[arg] is not None] | |
for arg in valid_args: | |
if arg == "components_path": | |
settings_service.settings.update_settings(components_path=components_path) | |
elif hasattr(settings_service.settings, arg): | |
settings_service.set(arg, values[arg]) | |
logger.debug(f"Loading config from cli parameter '{arg}': '{values[arg]}'") | |
host = | |
port = settings_service.settings.port | |
workers = settings_service.settings.workers | |
worker_timeout = settings_service.settings.worker_timeout | |
log_level = settings_service.settings.log_level | |
frontend_path = settings_service.settings.frontend_path | |
backend_only = settings_service.settings.backend_only | |
# create path object if frontend_path is provided | |
static_files_dir: Path | None = Path(frontend_path) if frontend_path else None | |
app = setup_app(static_files_dir=static_files_dir, backend_only=backend_only) | |
# check if port is being used | |
if is_port_in_use(port, host): | |
port = get_free_port(port) | |
options = { | |
"bind": f"{host}:{port}", | |
"workers": get_number_of_workers(workers), | |
"timeout": worker_timeout, | |
} | |
# Define an env variable to know if we are just testing the server | |
if "pytest" in sys.modules: | |
return | |
process: Process | None = None | |
try: | |
if platform.system() == "Windows": | |
# Run using uvicorn on MacOS and Windows | |
# Windows doesn't support gunicorn | |
# MacOS requires an env variable to be set to use gunicorn | |
run_on_windows(host, port, log_level, options, app) | |
else: | |
# Run using gunicorn on Linux | |
process = run_on_mac_or_linux(host, port, log_level, options, app) | |
if open_browser and not backend_only: | |
click.launch(f"http://{host}:{port}") | |
if process: | |
process.join() | |
except (KeyboardInterrupt, SystemExit) as e: | |"Shutting down server...") | |
if process is not None: | |
process.terminate() | |
process.join(timeout=15) # Wait up to 15 seconds for process to terminate | |
if process.is_alive(): | |
logger.warning("Process did not terminate gracefully, forcing...") | |
process.kill() | |
raise typer.Exit(0) from e | |
except Exception as e: | |
logger.exception(e) | |
if process is not None: | |
process.terminate() | |
raise typer.Exit(1) from e | |
def wait_for_server_ready(host, port) -> None: | |
"""Wait for the server to become ready by polling the health endpoint.""" | |
status_code = 0 | |
while status_code != | |
try: | |
status_code = httpx.get(f"http://{host}:{port}/health").status_code | |
except HTTPError: | |
time.sleep(1) | |
except Exception: # noqa: BLE001 | |
logger.opt(exception=True).debug("Error while waiting for the server to become ready.") | |
time.sleep(1) | |
def run_on_mac_or_linux(host, port, log_level, options, app): | |
webapp_process = Process(target=run_langflow, args=(host, port, log_level, options, app)) | |
webapp_process.start() | |
wait_for_server_ready(host, port) | |
print_banner(host, port) | |
return webapp_process | |
def run_on_windows(host, port, log_level, options, app) -> None: | |
"""Run the Langflow server on Windows.""" | |
print_banner(host, port) | |
run_langflow(host, port, log_level, options, app) | |
def is_port_in_use(port, host="localhost"): | |
"""Check if a port is in use. | |
Args: | |
port (int): The port number to check. | |
host (str): The host to check the port on. Defaults to 'localhost'. | |
Returns: | |
bool: True if the port is in use, False otherwise. | |
""" | |
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: | |
return s.connect_ex((host, port)) == 0 | |
def get_free_port(port): | |
"""Given a used port, find a free port. | |
Args: | |
port (int): The port number to check. | |
Returns: | |
int: A free port number. | |
""" | |
while is_port_in_use(port): | |
port += 1 | |
return port | |
def get_letter_from_version(version: str) -> str | None: | |
"""Get the letter from a pre-release version.""" | |
if "a" in version: | |
return "a" | |
if "b" in version: | |
return "b" | |
if "rc" in version: | |
return "rc" | |
return None | |
def build_version_notice(current_version: str, package_name: str) -> str: | |
latest_version = fetch_latest_version(package_name, include_prerelease=langflow_is_pre_release(current_version)) | |
if latest_version and pkg_version.parse(current_version) < pkg_version.parse(latest_version): | |
release_type = "pre-release" if langflow_is_pre_release(latest_version) else "version" | |
return f"A new {release_type} of {package_name} is available: {latest_version}" | |
return "" | |
def generate_pip_command(package_names, is_pre_release) -> str: | |
"""Generate the pip install command based on the packages and whether it's a pre-release.""" | |
base_command = "pip install" | |
if is_pre_release: | |
return f"{base_command} {' '.join(package_names)} -U --pre" | |
return f"{base_command} {' '.join(package_names)} -U" | |
def stylize_text(text: str, to_style: str, *, is_prerelease: bool) -> str: | |
color = "#42a7f5" if is_prerelease else "#6e42f5" | |
# return "".join(f"[{color}]{char}[/]" for char in text) | |
styled_text = f"[{color}]{to_style}[/]" | |
return text.replace(to_style, styled_text) | |
def print_banner(host: str, port: int) -> None: | |
notices = [] | |
package_names = [] # Track package names for pip install instructions | |
is_pre_release = False # Track if any package is a pre-release | |
package_name = "" | |
# Use langflow.utils.version to get the version info | |
version_info = get_version_info() | |
langflow_version = version_info["version"] | |
package_name = version_info["package"] | |
is_pre_release |= langflow_is_pre_release(langflow_version) # Update pre-release status | |
notice = build_version_notice(langflow_version, package_name) | |
notice = stylize_text(notice, package_name, is_prerelease=is_pre_release) | |
if notice: | |
notices.append(notice) | |
package_names.append(package_name) | |
# Generate pip command based on the collected data | |
pip_command = generate_pip_command(package_names, is_pre_release) | |
# Add pip install command to notices if any package needs an update | |
if notices: | |
notices.append(f"Run '{pip_command}' to update.") | |
styled_notices = [f"[bold]{notice}[/bold]" for notice in notices if notice] | |
styled_package_name = stylize_text( | |
package_name, package_name, is_prerelease=any("pre-release" in notice for notice in notices) | |
) | |
title = f"[bold]Welcome to :chains: {styled_package_name}[/bold]\n" | |
info_text = ( | |
"Collaborate, and contribute at our " | |
"[bold][link=]GitHub Repo[/link][/bold] :star2:" | |
) | |
telemetry_text = ( | |
"We collect anonymous usage data to improve Langflow.\n" | |
"You can opt-out by setting [bold]DO_NOT_TRACK=true[/bold] in your environment." | |
) | |
access_link = f"Access [link=http://{host}:{port}]http://{host}:{port}[/link]" | |
panel_content = "\n\n".join([title, *styled_notices, info_text, telemetry_text, access_link]) | |
panel = Panel(panel_content, box=box.ROUNDED, border_style="blue", expand=False) | |
rprint(panel) | |
def run_langflow(host, port, log_level, options, app) -> None: | |
"""Run Langflow server on localhost.""" | |
if platform.system() == "Windows": | |
import uvicorn | | | |
app, | |
host=host, | |
port=port, | |
log_level=log_level.lower(), | |
loop="asyncio", | |
) | |
else: | |
from langflow.server import LangflowApplication | |
server = LangflowApplication(app, options) | |
def graceful_shutdown(signum, frame): # noqa: ARG001 | |
"""Gracefully shutdown the server when receiving SIGTERM.""" | |
# Suppress click exceptions during shutdown | |
import click | |
click.echo = lambda *args, **kwargs: None # noqa: ARG005 | |"Gracefully shutting down server...") | |
# For Gunicorn workers, we raise SystemExit to trigger graceful shutdown | |
raise SystemExit(0) | |
# Register signal handlers | |
signal.signal(signal.SIGTERM, graceful_shutdown) | |
signal.signal(signal.SIGINT, graceful_shutdown) | |
try: | | | |
except (KeyboardInterrupt, SystemExit): | |
# Suppress the exception output | |
sys.exit(0) | |
def superuser( | |
username: str = typer.Option(..., prompt=True, help="Username for the superuser."), | |
password: str = typer.Option(..., prompt=True, hide_input=True, help="Password for the superuser."), | |
log_level: str = typer.Option("error", help="Logging level.", envvar="LANGFLOW_LOG_LEVEL"), | |
) -> None: | |
"""Create a superuser.""" | |
configure(log_level=log_level) | |
db_service = get_db_service() | |
async def _create_superuser(): | |
await initialize_services() | |
async with async_session_getter(db_service) as session: | |
from import create_super_user | |
if await create_super_user(db=session, username=username, password=password): | |
# Verify that the superuser was created | |
from import User | |
stmt = select(User).where(User.username == username) | |
user: User = (await session.exec(stmt)).first() | |
if user is None or not user.is_superuser: | |
typer.echo("Superuser creation failed.") | |
return | |
# Now create the first folder for the user | |
result = await create_default_folder_if_it_doesnt_exist(session, | |
if result: | |
typer.echo("Default folder created successfully.") | |
else: | |
msg = "Could not create default folder." | |
raise RuntimeError(msg) | |
typer.echo("Superuser created successfully.") | |
else: | |
typer.echo("Superuser creation failed.") | | | |
# command to copy the langflow database from the cache to the current directory | |
# because now the database is stored per installation | |
def copy_db() -> None: | |
"""Copy the database files to the current directory. | |
This function copies the 'langflow.db' and 'langflow-pre.db' files from the cache directory to the current | |
directory. | |
If the files exist in the cache directory, they will be copied to the same directory as this script ( | |
Returns: | |
None | |
""" | |
import shutil | |
from platformdirs import user_cache_dir | |
cache_dir = Path(user_cache_dir("langflow")) | |
db_path = cache_dir / "langflow.db" | |
pre_db_path = cache_dir / "langflow-pre.db" | |
# It should be copied to the current directory | |
# this file is and it should be in the same directory as the database | |
destination_folder = Path(__file__).parent | |
if db_path.exists(): | |
shutil.copy(db_path, destination_folder) | |
typer.echo(f"Database copied to {destination_folder}") | |
else: | |
typer.echo("Database not found in the cache directory.") | |
if pre_db_path.exists(): | |
shutil.copy(pre_db_path, destination_folder) | |
typer.echo(f"Pre-release database copied to {destination_folder}") | |
else: | |
typer.echo("Pre-release database not found in the cache directory.") | |
def migration( | |
test: bool = typer.Option(default=True, help="Run migrations in test mode."), # noqa: FBT001 | |
fix: bool = typer.Option( # noqa: FBT001 | |
default=False, | |
help="Fix migrations. This is a destructive operation, and should only be used if you know what you are doing.", | |
), | |
) -> None: | |
"""Run or test migrations.""" | |
if fix and not typer.confirm( | |
"This will delete all data necessary to fix migrations. Are you sure you want to continue?" | |
): | |
raise typer.Abort | | | |
db_service = get_db_service() | |
if not test: | |
db_service.run_migrations() | |
results = db_service.run_migrations_test() | |
display_results(results) | |
def api_key( | |
log_level: str = typer.Option("error", help="Logging level."), | |
) -> None: | |
"""Creates an API key for the default superuser if AUTO_LOGIN is enabled. | |
Args: | |
log_level (str, optional): Logging level. Defaults to "error". | |
Returns: | |
None | |
""" | |
configure(log_level=log_level) | |
async def aapi_key(): | |
await initialize_services() | |
settings_service = get_settings_service() | |
auth_settings = settings_service.auth_settings | |
if not auth_settings.AUTO_LOGIN: | |
typer.echo("Auto login is disabled. API keys cannot be created through the CLI.") | |
return None | |
async with async_session_scope() as session: | |
from import User | |
stmt = select(User).where(User.username == DEFAULT_SUPERUSER) | |
superuser = (await session.exec(stmt)).first() | |
if not superuser: | |
typer.echo( | |
"Default superuser not found. This command requires a superuser and AUTO_LOGIN to be enabled." | |
) | |
return None | |
from import ApiKey, ApiKeyCreate | |
from import create_api_key, delete_api_key | |
stmt = select(ApiKey).where(ApiKey.user_id == | |
api_key = (await session.exec(stmt)).first() | |
if api_key: | |
await delete_api_key(session, | |
api_key_create = ApiKeyCreate(name="CLI") | |
unmasked_api_key = await create_api_key(session, api_key_create, | |
await session.commit() | |
return unmasked_api_key | |
unmasked_api_key = | |
# Create a banner to display the API key and tell the user it won't be shown again | |
api_key_banner(unmasked_api_key) | |
def show_version(*, value: bool): | |
if value: | |
default = "DEV" | |
raw_info = get_version_info() | |
version = raw_info.get("version", default) if raw_info else default | |
typer.echo(f"langflow {version}") | |
raise typer.Exit | |
def version_option( | |
*, | |
version: bool = typer.Option( | |
None, | |
"--version", | |
"-v", | |
callback=show_version, | |
is_eager=True, | |
help="Show the version and exit.", | |
), | |
): | |
pass | |
def api_key_banner(unmasked_api_key) -> None: | |
is_mac = platform.system() == "Darwin" | |
import pyperclip | |
pyperclip.copy(unmasked_api_key.api_key) | |
panel = Panel( | |
f"[bold]API Key Created Successfully:[/bold]\n\n" | |
f"[bold blue]{unmasked_api_key.api_key}[/bold blue]\n\n" | |
"This is the only time the API key will be displayed. \n" | |
"Make sure to store it in a secure location. \n\n" | |
f"The API key has been copied to your clipboard. [bold]{['Ctrl', 'Cmd'][is_mac]} + V[/bold] to paste it.", | |
box=box.ROUNDED, | |
border_style="blue", | |
expand=False, | |
) | |
console = Console() | |
console.print(panel) | |
def main() -> None: | |
with warnings.catch_warnings(): | |
warnings.simplefilter("ignore") | |
app() | |
if __name__ == "__main__": | |
try: | |
main() | |
except Exception as e: | |
logger.exception(e) | |
raise typer.Exit(1) from e | |