Tai Truong
fix readme
d202ada
import copy
import json
import shutil
import time
from collections import defaultdict
from copy import deepcopy
from datetime import datetime, timezone
from pathlib import Path
from uuid import UUID
import orjson
from aiofile import async_open
from emoji import demojize, purely_emoji
from loguru import logger
from sqlalchemy.exc import NoResultFound
from sqlmodel import select
from langflow.base.constants import FIELD_FORMAT_ATTRIBUTES, NODE_FORMAT_ATTRIBUTES, ORJSON_OPTIONS
from langflow.initial_setup.constants import STARTER_FOLDER_DESCRIPTION, STARTER_FOLDER_NAME
from langflow.services.auth.utils import create_super_user
from langflow.services.database.models.flow.model import Flow, FlowCreate
from langflow.services.database.models.folder.model import Folder, FolderCreate
from langflow.services.database.models.folder.utils import (
create_default_folder_if_it_doesnt_exist,
get_default_folder_id,
)
from langflow.services.database.models.user.crud import get_user_by_username
from langflow.services.deps import (
async_session_scope,
get_settings_service,
get_storage_service,
get_variable_service,
session_scope,
)
from langflow.template.field.prompt import DEFAULT_PROMPT_INTUT_TYPES
from langflow.utils.util import escape_json_dump
# In the folder ./starter_projects we have a few JSON files that represent
# starter projects. We want to load these into the database so that users
# can use them as a starting point for their own projects.
def update_projects_components_with_latest_component_versions(project_data, all_types_dict):
# Flatten the all_types_dict for easy access
all_types_dict_flat = {}
for category in all_types_dict.values():
for key, component in category.items():
all_types_dict_flat[key] = component # noqa: PERF403
node_changes_log = defaultdict(list)
project_data_copy = deepcopy(project_data)
for node in project_data_copy.get("nodes", []):
node_data = node.get("data").get("node")
node_type = node.get("data").get("type")
# Skip updating if tool_mode is True
if node_data.get("tool_mode", False):
continue
# Skip nodes with outputs of the specified format
# NOTE: to account for the fact that the Simple Agent has dynamic outputs
if any(output.get("types") == ["Tool"] for output in node_data.get("outputs", [])):
continue
if node_type in all_types_dict_flat:
latest_node = all_types_dict_flat.get(node_type)
latest_template = latest_node.get("template")
node_data["template"]["code"] = latest_template["code"]
if "outputs" in latest_node:
node_data["outputs"] = latest_node["outputs"]
if node_data["template"]["_type"] != latest_template["_type"]:
node_data["template"]["_type"] = latest_template["_type"]
if node_type != "Prompt":
node_data["template"] = latest_template
else:
for key, value in latest_template.items():
if key not in node_data["template"]:
node_changes_log[node_type].append(
{
"attr": key,
"old_value": None,
"new_value": value,
}
)
node_data["template"][key] = value
elif isinstance(value, dict) and value.get("value"):
node_changes_log[node_type].append(
{
"attr": key,
"old_value": node_data["template"][key],
"new_value": value,
}
)
node_data["template"][key]["value"] = value["value"]
for key in node_data["template"]:
if key not in latest_template:
node_data["template"][key]["input_types"] = DEFAULT_PROMPT_INTUT_TYPES
node_changes_log[node_type].append(
{
"attr": "_type",
"old_value": node_data["template"]["_type"],
"new_value": latest_template["_type"],
}
)
else:
for attr in NODE_FORMAT_ATTRIBUTES:
if (
attr in latest_node
# Check if it needs to be updated
and latest_node[attr] != node_data.get(attr)
):
node_changes_log[node_type].append(
{
"attr": attr,
"old_value": node_data.get(attr),
"new_value": latest_node[attr],
}
)
node_data[attr] = latest_node[attr]
for field_name, field_dict in latest_template.items():
if field_name not in node_data["template"]:
node_data["template"][field_name] = field_dict
continue
# The idea here is to update some attributes of the field
to_check_attributes = FIELD_FORMAT_ATTRIBUTES
for attr in to_check_attributes:
if (
attr in field_dict
and attr in node_data["template"].get(field_name)
# Check if it needs to be updated
and field_dict[attr] != node_data["template"][field_name][attr]
):
node_changes_log[node_type].append(
{
"attr": f"{field_name}.{attr}",
"old_value": node_data["template"][field_name][attr],
"new_value": field_dict[attr],
}
)
node_data["template"][field_name][attr] = field_dict[attr]
# Remove fields that are not in the latest template
if node_type != "Prompt":
for field_name in list(node_data["template"].keys()):
if field_name not in latest_template:
node_data["template"].pop(field_name)
log_node_changes(node_changes_log)
return project_data_copy
def scape_json_parse(json_string: str) -> dict:
if isinstance(json_string, dict):
return json_string
parsed_string = json_string.replace("œ", '"')
return json.loads(parsed_string)
def update_new_output(data):
nodes = copy.deepcopy(data["nodes"])
edges = copy.deepcopy(data["edges"])
for edge in edges:
if "sourceHandle" in edge and "targetHandle" in edge:
new_source_handle = scape_json_parse(edge["sourceHandle"])
new_target_handle = scape_json_parse(edge["targetHandle"])
id_ = new_source_handle["id"]
source_node_index = next((index for (index, d) in enumerate(nodes) if d["id"] == id_), -1)
source_node = nodes[source_node_index] if source_node_index != -1 else None
if "baseClasses" in new_source_handle:
if "output_types" not in new_source_handle:
if source_node and "node" in source_node["data"] and "output_types" in source_node["data"]["node"]:
new_source_handle["output_types"] = source_node["data"]["node"]["output_types"]
else:
new_source_handle["output_types"] = new_source_handle["baseClasses"]
del new_source_handle["baseClasses"]
if new_target_handle.get("inputTypes"):
intersection = [
type_ for type_ in new_source_handle["output_types"] if type_ in new_target_handle["inputTypes"]
]
else:
intersection = [
type_ for type_ in new_source_handle["output_types"] if type_ == new_target_handle["type"]
]
selected = intersection[0] if intersection else None
if "name" not in new_source_handle:
new_source_handle["name"] = " | ".join(new_source_handle["output_types"])
new_source_handle["output_types"] = [selected] if selected else []
if source_node and not source_node["data"]["node"].get("outputs"):
if "outputs" not in source_node["data"]["node"]:
source_node["data"]["node"]["outputs"] = []
types = source_node["data"]["node"].get(
"output_types", source_node["data"]["node"].get("base_classes", [])
)
if not any(output.get("selected") == selected for output in source_node["data"]["node"]["outputs"]):
source_node["data"]["node"]["outputs"].append(
{
"types": types,
"selected": selected,
"name": " | ".join(types),
"display_name": " | ".join(types),
}
)
deduplicated_outputs = []
if source_node is None:
source_node = {"data": {"node": {"outputs": []}}}
for output in source_node["data"]["node"]["outputs"]:
if output["name"] not in [d["name"] for d in deduplicated_outputs]:
deduplicated_outputs.append(output)
source_node["data"]["node"]["outputs"] = deduplicated_outputs
edge["sourceHandle"] = escape_json_dump(new_source_handle)
edge["data"]["sourceHandle"] = new_source_handle
edge["data"]["targetHandle"] = new_target_handle
# The above sets the edges but some of the sourceHandles do not have valid name
# which can be found in the nodes. We need to update the sourceHandle with the
# name from node['data']['node']['outputs']
for node in nodes:
if "outputs" in node["data"]["node"]:
for output in node["data"]["node"]["outputs"]:
for edge in edges:
if node["id"] != edge["source"] or output.get("method") is None:
continue
source_handle = scape_json_parse(edge["sourceHandle"])
if source_handle["output_types"] == output.get("types") and source_handle["name"] != output["name"]:
source_handle["name"] = output["name"]
if isinstance(source_handle, str):
source_handle = scape_json_parse(source_handle)
edge["sourceHandle"] = escape_json_dump(source_handle)
edge["data"]["sourceHandle"] = source_handle
data_copy = copy.deepcopy(data)
data_copy["nodes"] = nodes
data_copy["edges"] = edges
return data_copy
def update_edges_with_latest_component_versions(project_data):
edge_changes_log = defaultdict(list)
project_data_copy = deepcopy(project_data)
for edge in project_data_copy.get("edges", []):
source_handle = edge.get("data").get("sourceHandle")
source_handle = scape_json_parse(source_handle)
target_handle = edge.get("data").get("targetHandle")
target_handle = scape_json_parse(target_handle)
# Now find the source and target nodes in the nodes list
source_node = next(
(node for node in project_data.get("nodes", []) if node.get("id") == edge.get("source")),
None,
)
target_node = next(
(node for node in project_data.get("nodes", []) if node.get("id") == edge.get("target")),
None,
)
if source_node and target_node:
source_node_data = source_node.get("data").get("node")
target_node_data = target_node.get("data").get("node")
output_data = next(
(output for output in source_node_data.get("outputs", []) if output["name"] == source_handle["name"]),
None,
)
if not output_data:
output_data = next(
(
output
for output in source_node_data.get("outputs", [])
if output["display_name"] == source_handle["name"]
),
None,
)
if output_data:
source_handle["name"] = output_data["name"]
if output_data:
if len(output_data.get("types")) == 1:
new_output_types = output_data.get("types")
elif output_data.get("selected"):
new_output_types = [output_data.get("selected")]
else:
new_output_types = []
else:
new_output_types = []
if source_handle["output_types"] != new_output_types:
edge_changes_log[source_node_data["display_name"]].append(
{
"attr": "output_types",
"old_value": source_handle["output_types"],
"new_value": new_output_types,
}
)
source_handle["output_types"] = new_output_types
field_name = target_handle.get("fieldName")
if field_name in target_node_data.get("template") and target_handle["inputTypes"] != target_node_data.get(
"template"
).get(field_name).get("input_types"):
edge_changes_log[target_node_data["display_name"]].append(
{
"attr": "inputTypes",
"old_value": target_handle["inputTypes"],
"new_value": target_node_data.get("template").get(field_name).get("input_types"),
}
)
target_handle["inputTypes"] = target_node_data.get("template").get(field_name).get("input_types")
escaped_source_handle = escape_json_dump(source_handle)
escaped_target_handle = escape_json_dump(target_handle)
try:
old_escape_source_handle = escape_json_dump(json.loads(edge["sourceHandle"]))
except json.JSONDecodeError:
old_escape_source_handle = edge["sourceHandle"]
try:
old_escape_target_handle = escape_json_dump(json.loads(edge["targetHandle"]))
except json.JSONDecodeError:
old_escape_target_handle = edge["targetHandle"]
if old_escape_source_handle != escaped_source_handle:
edge_changes_log[source_node_data["display_name"]].append(
{
"attr": "sourceHandle",
"old_value": old_escape_source_handle,
"new_value": escaped_source_handle,
}
)
edge["sourceHandle"] = escaped_source_handle
if old_escape_target_handle != escaped_target_handle:
edge_changes_log[target_node_data["display_name"]].append(
{
"attr": "targetHandle",
"old_value": old_escape_target_handle,
"new_value": escaped_target_handle,
}
)
edge["targetHandle"] = escaped_target_handle
else:
logger.error(f"Source or target node not found for edge: {edge}")
log_node_changes(edge_changes_log)
return project_data_copy
def log_node_changes(node_changes_log) -> None:
# The idea here is to log the changes that were made to the nodes in debug
# Something like:
# Node: "Node Name" was updated with the following changes:
# attr_name: old_value -> new_value
# let's create one log per node
formatted_messages = []
for node_name, changes in node_changes_log.items():
message = f"\nNode: {node_name} was updated with the following changes:"
for change in changes:
message += f"\n- {change['attr']}: {change['old_value']} -> {change['new_value']}"
formatted_messages.append(message)
if formatted_messages:
logger.debug("\n".join(formatted_messages))
def load_starter_projects(retries=3, delay=1) -> list[tuple[Path, dict]]:
starter_projects = []
folder = Path(__file__).parent / "starter_projects"
for file in folder.glob("*.json"):
attempt = 0
while attempt < retries:
content = file.read_text(encoding="utf-8")
try:
project = orjson.loads(content)
starter_projects.append((file, project))
logger.info(f"Loaded starter project {file}")
break # Break if load is successful
except orjson.JSONDecodeError as e:
attempt += 1
if attempt >= retries:
msg = f"Error loading starter project {file}: {e}"
raise ValueError(msg) from e
time.sleep(delay) # Wait before retrying
return starter_projects
def copy_profile_pictures() -> None:
config_dir = get_storage_service().settings_service.settings.config_dir
if config_dir is None:
msg = "Config dir is not set in the settings"
raise ValueError(msg)
origin = Path(__file__).parent / "profile_pictures"
target = Path(config_dir) / "profile_pictures"
if not origin.exists():
msg = f"The source folder '{origin}' does not exist."
raise ValueError(msg)
if not target.exists():
target.mkdir(parents=True)
try:
shutil.copytree(origin, target, dirs_exist_ok=True)
logger.debug(f"Folder copied from '{origin}' to '{target}'")
except Exception: # noqa: BLE001
logger.exception("Error copying the folder")
def get_project_data(project):
project_name = project.get("name")
project_description = project.get("description")
project_is_component = project.get("is_component")
project_updated_at = project.get("updated_at")
if not project_updated_at:
updated_at_datetime = datetime.now(tz=timezone.utc)
else:
updated_at_datetime = datetime.fromisoformat(project_updated_at)
project_data = project.get("data")
project_icon = project.get("icon")
project_icon = demojize(project_icon) if project_icon and purely_emoji(project_icon) else project_icon
project_icon_bg_color = project.get("icon_bg_color")
project_gradient = project.get("gradient")
project_tags = project.get("tags")
return (
project_name,
project_description,
project_is_component,
updated_at_datetime,
project_data,
project_icon,
project_icon_bg_color,
project_gradient,
project_tags,
)
def update_project_file(project_path: Path, project: dict, updated_project_data) -> None:
project["data"] = updated_project_data
project_path.write_text(orjson.dumps(project, option=ORJSON_OPTIONS).decode(), encoding="utf-8")
logger.info(f"Updated starter project {project['name']} file")
def update_existing_project(
existing_project,
project_name,
project_description,
project_is_component,
updated_at_datetime,
project_data,
project_icon,
project_icon_bg_color,
) -> None:
logger.info(f"Updating starter project {project_name}")
existing_project.data = project_data
existing_project.folder = STARTER_FOLDER_NAME
existing_project.description = project_description
existing_project.is_component = project_is_component
existing_project.updated_at = updated_at_datetime
existing_project.icon = project_icon
existing_project.icon_bg_color = project_icon_bg_color
def create_new_project(
session,
project_name,
project_description,
project_is_component,
updated_at_datetime,
project_data,
project_gradient,
project_tags,
project_icon,
project_icon_bg_color,
new_folder_id,
) -> None:
logger.debug(f"Creating starter project {project_name}")
new_project = FlowCreate(
name=project_name,
description=project_description,
icon=project_icon,
icon_bg_color=project_icon_bg_color,
data=project_data,
is_component=project_is_component,
updated_at=updated_at_datetime,
folder_id=new_folder_id,
gradient=project_gradient,
tags=project_tags,
)
db_flow = Flow.model_validate(new_project, from_attributes=True)
session.add(db_flow)
def get_all_flows_similar_to_project(session, folder_id):
return session.exec(select(Folder).where(Folder.id == folder_id)).first().flows
def delete_start_projects(session, folder_id) -> None:
flows = session.exec(select(Folder).where(Folder.id == folder_id)).first().flows
for flow in flows:
session.delete(flow)
session.commit()
def folder_exists(session, folder_name):
folder = session.exec(select(Folder).where(Folder.name == folder_name)).first()
return folder is not None
def create_starter_folder(session):
if not folder_exists(session, STARTER_FOLDER_NAME):
new_folder = FolderCreate(name=STARTER_FOLDER_NAME, description=STARTER_FOLDER_DESCRIPTION)
db_folder = Folder.model_validate(new_folder, from_attributes=True)
session.add(db_folder)
session.commit()
session.refresh(db_folder)
return db_folder
return session.exec(select(Folder).where(Folder.name == STARTER_FOLDER_NAME)).first()
def _is_valid_uuid(val):
try:
uuid_obj = UUID(val)
except ValueError:
return False
return str(uuid_obj) == val
async def load_flows_from_directory() -> None:
"""On langflow startup, this loads all flows from the directory specified in the settings.
All flows are uploaded into the default folder for the superuser.
Note that this feature currently only works if AUTO_LOGIN is enabled in the settings.
"""
settings_service = get_settings_service()
flows_path = settings_service.settings.load_flows_path
if not flows_path:
return
if not settings_service.auth_settings.AUTO_LOGIN:
logger.warning("AUTO_LOGIN is disabled, not loading flows from directory")
return
async with async_session_scope() as session:
user = await get_user_by_username(session, settings_service.auth_settings.SUPERUSER)
if user is None:
msg = "Superuser not found in the database"
raise NoResultFound(msg)
user_id = user.id
flows_path_ = Path(flows_path)
files = [f for f in flows_path_.iterdir() if f.is_file()]
for file_path in files:
if file_path.suffix != ".json":
continue
logger.info(f"Loading flow from file: {file_path.name}")
async with async_open(file_path, "r", encoding="utf-8") as f:
content = await f.read()
flow = orjson.loads(content)
no_json_name = file_path.stem
flow_endpoint_name = flow.get("endpoint_name")
if _is_valid_uuid(no_json_name):
flow["id"] = no_json_name
flow_id = flow.get("id")
existing = await find_existing_flow(session, flow_id, flow_endpoint_name)
if existing:
logger.debug(f"Found existing flow: {existing.name}")
logger.info(f"Updating existing flow: {flow_id} with endpoint name {flow_endpoint_name}")
for key, value in flow.items():
if hasattr(existing, key):
# flow dict from json and db representation are not 100% the same
setattr(existing, key, value)
existing.updated_at = datetime.now(tz=timezone.utc).astimezone()
existing.user_id = user_id
# Generally, folder_id should not be None, but we must check this due to the previous
# behavior where flows could be added and folder_id was None, orphaning
# them within Langflow.
if existing.folder_id is None:
folder_id = await get_default_folder_id(session, user_id)
existing.folder_id = folder_id
session.add(existing)
else:
logger.info(f"Creating new flow: {flow_id} with endpoint name {flow_endpoint_name}")
# Current behavior loads all new flows into default folder
folder_id = await get_default_folder_id(session, user_id)
flow["user_id"] = user_id
flow["folder_id"] = folder_id
flow = Flow.model_validate(flow, from_attributes=True)
flow.updated_at = datetime.now(tz=timezone.utc).astimezone()
session.add(flow)
async def find_existing_flow(session, flow_id, flow_endpoint_name):
if flow_endpoint_name:
logger.debug(f"flow_endpoint_name: {flow_endpoint_name}")
stmt = select(Flow).where(Flow.endpoint_name == flow_endpoint_name)
if existing := (await session.exec(stmt)).first():
logger.debug(f"Found existing flow by endpoint name: {existing.name}")
return existing
stmt = select(Flow).where(Flow.id == flow_id)
if existing := (await session.exec(stmt)).first():
logger.debug(f"Found existing flow by id: {flow_id}")
return existing
return None
def create_or_update_starter_projects(all_types_dict: dict) -> None:
with session_scope() as session:
new_folder = create_starter_folder(session)
starter_projects = load_starter_projects()
delete_start_projects(session, new_folder.id)
copy_profile_pictures()
for project_path, project in starter_projects:
(
project_name,
project_description,
project_is_component,
updated_at_datetime,
project_data,
project_icon,
project_icon_bg_color,
project_gradient,
project_tags,
) = get_project_data(project)
updated_project_data = update_projects_components_with_latest_component_versions(
project_data.copy(), all_types_dict
)
updated_project_data = update_edges_with_latest_component_versions(updated_project_data)
if updated_project_data != project_data:
project_data = updated_project_data
# We also need to update the project data in the file
update_project_file(project_path, project, updated_project_data)
if project_name and project_data:
for existing_project in get_all_flows_similar_to_project(session, new_folder.id):
session.delete(existing_project)
create_new_project(
session=session,
project_name=project_name,
project_description=project_description,
project_is_component=project_is_component,
updated_at_datetime=updated_at_datetime,
project_data=project_data,
project_icon=project_icon,
project_icon_bg_color=project_icon_bg_color,
project_gradient=project_gradient,
project_tags=project_tags,
new_folder_id=new_folder.id,
)
async def initialize_super_user_if_needed() -> None:
settings_service = get_settings_service()
if not settings_service.auth_settings.AUTO_LOGIN:
return
username = settings_service.auth_settings.SUPERUSER
password = settings_service.auth_settings.SUPERUSER_PASSWORD
if not username or not password:
msg = "SUPERUSER and SUPERUSER_PASSWORD must be set in the settings if AUTO_LOGIN is true."
raise ValueError(msg)
async with async_session_scope() as async_session:
super_user = await create_super_user(db=async_session, username=username, password=password)
await get_variable_service().initialize_user_variables(super_user.id, async_session)
await create_default_folder_if_it_doesnt_exist(async_session, super_user.id)
logger.info("Super user initialized")