Spaces:
Running
Running
import copy | |
import json | |
import shutil | |
import time | |
from collections import defaultdict | |
from copy import deepcopy | |
from datetime import datetime, timezone | |
from pathlib import Path | |
from uuid import UUID | |
import orjson | |
from aiofile import async_open | |
from emoji import demojize, purely_emoji | |
from loguru import logger | |
from sqlalchemy.exc import NoResultFound | |
from sqlmodel import select | |
from langflow.base.constants import FIELD_FORMAT_ATTRIBUTES, NODE_FORMAT_ATTRIBUTES, ORJSON_OPTIONS | |
from langflow.initial_setup.constants import STARTER_FOLDER_DESCRIPTION, STARTER_FOLDER_NAME | |
from langflow.services.auth.utils import create_super_user | |
from langflow.services.database.models.flow.model import Flow, FlowCreate | |
from langflow.services.database.models.folder.model import Folder, FolderCreate | |
from langflow.services.database.models.folder.utils import ( | |
create_default_folder_if_it_doesnt_exist, | |
get_default_folder_id, | |
) | |
from langflow.services.database.models.user.crud import get_user_by_username | |
from langflow.services.deps import ( | |
async_session_scope, | |
get_settings_service, | |
get_storage_service, | |
get_variable_service, | |
session_scope, | |
) | |
from langflow.template.field.prompt import DEFAULT_PROMPT_INTUT_TYPES | |
from langflow.utils.util import escape_json_dump | |
# In the folder ./starter_projects we have a few JSON files that represent | |
# starter projects. We want to load these into the database so that users | |
# can use them as a starting point for their own projects. | |
def update_projects_components_with_latest_component_versions(project_data, all_types_dict): | |
# Flatten the all_types_dict for easy access | |
all_types_dict_flat = {} | |
for category in all_types_dict.values(): | |
for key, component in category.items(): | |
all_types_dict_flat[key] = component # noqa: PERF403 | |
node_changes_log = defaultdict(list) | |
project_data_copy = deepcopy(project_data) | |
for node in project_data_copy.get("nodes", []): | |
node_data = node.get("data").get("node") | |
node_type = node.get("data").get("type") | |
# Skip updating if tool_mode is True | |
if node_data.get("tool_mode", False): | |
continue | |
# Skip nodes with outputs of the specified format | |
# NOTE: to account for the fact that the Simple Agent has dynamic outputs | |
if any(output.get("types") == ["Tool"] for output in node_data.get("outputs", [])): | |
continue | |
if node_type in all_types_dict_flat: | |
latest_node = all_types_dict_flat.get(node_type) | |
latest_template = latest_node.get("template") | |
node_data["template"]["code"] = latest_template["code"] | |
if "outputs" in latest_node: | |
node_data["outputs"] = latest_node["outputs"] | |
if node_data["template"]["_type"] != latest_template["_type"]: | |
node_data["template"]["_type"] = latest_template["_type"] | |
if node_type != "Prompt": | |
node_data["template"] = latest_template | |
else: | |
for key, value in latest_template.items(): | |
if key not in node_data["template"]: | |
node_changes_log[node_type].append( | |
{ | |
"attr": key, | |
"old_value": None, | |
"new_value": value, | |
} | |
) | |
node_data["template"][key] = value | |
elif isinstance(value, dict) and value.get("value"): | |
node_changes_log[node_type].append( | |
{ | |
"attr": key, | |
"old_value": node_data["template"][key], | |
"new_value": value, | |
} | |
) | |
node_data["template"][key]["value"] = value["value"] | |
for key in node_data["template"]: | |
if key not in latest_template: | |
node_data["template"][key]["input_types"] = DEFAULT_PROMPT_INTUT_TYPES | |
node_changes_log[node_type].append( | |
{ | |
"attr": "_type", | |
"old_value": node_data["template"]["_type"], | |
"new_value": latest_template["_type"], | |
} | |
) | |
else: | |
for attr in NODE_FORMAT_ATTRIBUTES: | |
if ( | |
attr in latest_node | |
# Check if it needs to be updated | |
and latest_node[attr] != node_data.get(attr) | |
): | |
node_changes_log[node_type].append( | |
{ | |
"attr": attr, | |
"old_value": node_data.get(attr), | |
"new_value": latest_node[attr], | |
} | |
) | |
node_data[attr] = latest_node[attr] | |
for field_name, field_dict in latest_template.items(): | |
if field_name not in node_data["template"]: | |
node_data["template"][field_name] = field_dict | |
continue | |
# The idea here is to update some attributes of the field | |
to_check_attributes = FIELD_FORMAT_ATTRIBUTES | |
for attr in to_check_attributes: | |
if ( | |
attr in field_dict | |
and attr in node_data["template"].get(field_name) | |
# Check if it needs to be updated | |
and field_dict[attr] != node_data["template"][field_name][attr] | |
): | |
node_changes_log[node_type].append( | |
{ | |
"attr": f"{field_name}.{attr}", | |
"old_value": node_data["template"][field_name][attr], | |
"new_value": field_dict[attr], | |
} | |
) | |
node_data["template"][field_name][attr] = field_dict[attr] | |
# Remove fields that are not in the latest template | |
if node_type != "Prompt": | |
for field_name in list(node_data["template"].keys()): | |
if field_name not in latest_template: | |
node_data["template"].pop(field_name) | |
log_node_changes(node_changes_log) | |
return project_data_copy | |
def scape_json_parse(json_string: str) -> dict: | |
if isinstance(json_string, dict): | |
return json_string | |
parsed_string = json_string.replace("œ", '"') | |
return json.loads(parsed_string) | |
def update_new_output(data): | |
nodes = copy.deepcopy(data["nodes"]) | |
edges = copy.deepcopy(data["edges"]) | |
for edge in edges: | |
if "sourceHandle" in edge and "targetHandle" in edge: | |
new_source_handle = scape_json_parse(edge["sourceHandle"]) | |
new_target_handle = scape_json_parse(edge["targetHandle"]) | |
id_ = new_source_handle["id"] | |
source_node_index = next((index for (index, d) in enumerate(nodes) if d["id"] == id_), -1) | |
source_node = nodes[source_node_index] if source_node_index != -1 else None | |
if "baseClasses" in new_source_handle: | |
if "output_types" not in new_source_handle: | |
if source_node and "node" in source_node["data"] and "output_types" in source_node["data"]["node"]: | |
new_source_handle["output_types"] = source_node["data"]["node"]["output_types"] | |
else: | |
new_source_handle["output_types"] = new_source_handle["baseClasses"] | |
del new_source_handle["baseClasses"] | |
if new_target_handle.get("inputTypes"): | |
intersection = [ | |
type_ for type_ in new_source_handle["output_types"] if type_ in new_target_handle["inputTypes"] | |
] | |
else: | |
intersection = [ | |
type_ for type_ in new_source_handle["output_types"] if type_ == new_target_handle["type"] | |
] | |
selected = intersection[0] if intersection else None | |
if "name" not in new_source_handle: | |
new_source_handle["name"] = " | ".join(new_source_handle["output_types"]) | |
new_source_handle["output_types"] = [selected] if selected else [] | |
if source_node and not source_node["data"]["node"].get("outputs"): | |
if "outputs" not in source_node["data"]["node"]: | |
source_node["data"]["node"]["outputs"] = [] | |
types = source_node["data"]["node"].get( | |
"output_types", source_node["data"]["node"].get("base_classes", []) | |
) | |
if not any(output.get("selected") == selected for output in source_node["data"]["node"]["outputs"]): | |
source_node["data"]["node"]["outputs"].append( | |
{ | |
"types": types, | |
"selected": selected, | |
"name": " | ".join(types), | |
"display_name": " | ".join(types), | |
} | |
) | |
deduplicated_outputs = [] | |
if source_node is None: | |
source_node = {"data": {"node": {"outputs": []}}} | |
for output in source_node["data"]["node"]["outputs"]: | |
if output["name"] not in [d["name"] for d in deduplicated_outputs]: | |
deduplicated_outputs.append(output) | |
source_node["data"]["node"]["outputs"] = deduplicated_outputs | |
edge["sourceHandle"] = escape_json_dump(new_source_handle) | |
edge["data"]["sourceHandle"] = new_source_handle | |
edge["data"]["targetHandle"] = new_target_handle | |
# The above sets the edges but some of the sourceHandles do not have valid name | |
# which can be found in the nodes. We need to update the sourceHandle with the | |
# name from node['data']['node']['outputs'] | |
for node in nodes: | |
if "outputs" in node["data"]["node"]: | |
for output in node["data"]["node"]["outputs"]: | |
for edge in edges: | |
if node["id"] != edge["source"] or output.get("method") is None: | |
continue | |
source_handle = scape_json_parse(edge["sourceHandle"]) | |
if source_handle["output_types"] == output.get("types") and source_handle["name"] != output["name"]: | |
source_handle["name"] = output["name"] | |
if isinstance(source_handle, str): | |
source_handle = scape_json_parse(source_handle) | |
edge["sourceHandle"] = escape_json_dump(source_handle) | |
edge["data"]["sourceHandle"] = source_handle | |
data_copy = copy.deepcopy(data) | |
data_copy["nodes"] = nodes | |
data_copy["edges"] = edges | |
return data_copy | |
def update_edges_with_latest_component_versions(project_data): | |
edge_changes_log = defaultdict(list) | |
project_data_copy = deepcopy(project_data) | |
for edge in project_data_copy.get("edges", []): | |
source_handle = edge.get("data").get("sourceHandle") | |
source_handle = scape_json_parse(source_handle) | |
target_handle = edge.get("data").get("targetHandle") | |
target_handle = scape_json_parse(target_handle) | |
# Now find the source and target nodes in the nodes list | |
source_node = next( | |
(node for node in project_data.get("nodes", []) if node.get("id") == edge.get("source")), | |
None, | |
) | |
target_node = next( | |
(node for node in project_data.get("nodes", []) if node.get("id") == edge.get("target")), | |
None, | |
) | |
if source_node and target_node: | |
source_node_data = source_node.get("data").get("node") | |
target_node_data = target_node.get("data").get("node") | |
output_data = next( | |
(output for output in source_node_data.get("outputs", []) if output["name"] == source_handle["name"]), | |
None, | |
) | |
if not output_data: | |
output_data = next( | |
( | |
output | |
for output in source_node_data.get("outputs", []) | |
if output["display_name"] == source_handle["name"] | |
), | |
None, | |
) | |
if output_data: | |
source_handle["name"] = output_data["name"] | |
if output_data: | |
if len(output_data.get("types")) == 1: | |
new_output_types = output_data.get("types") | |
elif output_data.get("selected"): | |
new_output_types = [output_data.get("selected")] | |
else: | |
new_output_types = [] | |
else: | |
new_output_types = [] | |
if source_handle["output_types"] != new_output_types: | |
edge_changes_log[source_node_data["display_name"]].append( | |
{ | |
"attr": "output_types", | |
"old_value": source_handle["output_types"], | |
"new_value": new_output_types, | |
} | |
) | |
source_handle["output_types"] = new_output_types | |
field_name = target_handle.get("fieldName") | |
if field_name in target_node_data.get("template") and target_handle["inputTypes"] != target_node_data.get( | |
"template" | |
).get(field_name).get("input_types"): | |
edge_changes_log[target_node_data["display_name"]].append( | |
{ | |
"attr": "inputTypes", | |
"old_value": target_handle["inputTypes"], | |
"new_value": target_node_data.get("template").get(field_name).get("input_types"), | |
} | |
) | |
target_handle["inputTypes"] = target_node_data.get("template").get(field_name).get("input_types") | |
escaped_source_handle = escape_json_dump(source_handle) | |
escaped_target_handle = escape_json_dump(target_handle) | |
try: | |
old_escape_source_handle = escape_json_dump(json.loads(edge["sourceHandle"])) | |
except json.JSONDecodeError: | |
old_escape_source_handle = edge["sourceHandle"] | |
try: | |
old_escape_target_handle = escape_json_dump(json.loads(edge["targetHandle"])) | |
except json.JSONDecodeError: | |
old_escape_target_handle = edge["targetHandle"] | |
if old_escape_source_handle != escaped_source_handle: | |
edge_changes_log[source_node_data["display_name"]].append( | |
{ | |
"attr": "sourceHandle", | |
"old_value": old_escape_source_handle, | |
"new_value": escaped_source_handle, | |
} | |
) | |
edge["sourceHandle"] = escaped_source_handle | |
if old_escape_target_handle != escaped_target_handle: | |
edge_changes_log[target_node_data["display_name"]].append( | |
{ | |
"attr": "targetHandle", | |
"old_value": old_escape_target_handle, | |
"new_value": escaped_target_handle, | |
} | |
) | |
edge["targetHandle"] = escaped_target_handle | |
else: | |
logger.error(f"Source or target node not found for edge: {edge}") | |
log_node_changes(edge_changes_log) | |
return project_data_copy | |
def log_node_changes(node_changes_log) -> None: | |
# The idea here is to log the changes that were made to the nodes in debug | |
# Something like: | |
# Node: "Node Name" was updated with the following changes: | |
# attr_name: old_value -> new_value | |
# let's create one log per node | |
formatted_messages = [] | |
for node_name, changes in node_changes_log.items(): | |
message = f"\nNode: {node_name} was updated with the following changes:" | |
for change in changes: | |
message += f"\n- {change['attr']}: {change['old_value']} -> {change['new_value']}" | |
formatted_messages.append(message) | |
if formatted_messages: | |
logger.debug("\n".join(formatted_messages)) | |
def load_starter_projects(retries=3, delay=1) -> list[tuple[Path, dict]]: | |
starter_projects = [] | |
folder = Path(__file__).parent / "starter_projects" | |
for file in folder.glob("*.json"): | |
attempt = 0 | |
while attempt < retries: | |
content = file.read_text(encoding="utf-8") | |
try: | |
project = orjson.loads(content) | |
starter_projects.append((file, project)) | |
logger.info(f"Loaded starter project {file}") | |
break # Break if load is successful | |
except orjson.JSONDecodeError as e: | |
attempt += 1 | |
if attempt >= retries: | |
msg = f"Error loading starter project {file}: {e}" | |
raise ValueError(msg) from e | |
time.sleep(delay) # Wait before retrying | |
return starter_projects | |
def copy_profile_pictures() -> None: | |
config_dir = get_storage_service().settings_service.settings.config_dir | |
if config_dir is None: | |
msg = "Config dir is not set in the settings" | |
raise ValueError(msg) | |
origin = Path(__file__).parent / "profile_pictures" | |
target = Path(config_dir) / "profile_pictures" | |
if not origin.exists(): | |
msg = f"The source folder '{origin}' does not exist." | |
raise ValueError(msg) | |
if not target.exists(): | |
target.mkdir(parents=True) | |
try: | |
shutil.copytree(origin, target, dirs_exist_ok=True) | |
logger.debug(f"Folder copied from '{origin}' to '{target}'") | |
except Exception: # noqa: BLE001 | |
logger.exception("Error copying the folder") | |
def get_project_data(project): | |
project_name = project.get("name") | |
project_description = project.get("description") | |
project_is_component = project.get("is_component") | |
project_updated_at = project.get("updated_at") | |
if not project_updated_at: | |
updated_at_datetime = datetime.now(tz=timezone.utc) | |
else: | |
updated_at_datetime = datetime.fromisoformat(project_updated_at) | |
project_data = project.get("data") | |
project_icon = project.get("icon") | |
project_icon = demojize(project_icon) if project_icon and purely_emoji(project_icon) else project_icon | |
project_icon_bg_color = project.get("icon_bg_color") | |
project_gradient = project.get("gradient") | |
project_tags = project.get("tags") | |
return ( | |
project_name, | |
project_description, | |
project_is_component, | |
updated_at_datetime, | |
project_data, | |
project_icon, | |
project_icon_bg_color, | |
project_gradient, | |
project_tags, | |
) | |
def update_project_file(project_path: Path, project: dict, updated_project_data) -> None: | |
project["data"] = updated_project_data | |
project_path.write_text(orjson.dumps(project, option=ORJSON_OPTIONS).decode(), encoding="utf-8") | |
logger.info(f"Updated starter project {project['name']} file") | |
def update_existing_project( | |
existing_project, | |
project_name, | |
project_description, | |
project_is_component, | |
updated_at_datetime, | |
project_data, | |
project_icon, | |
project_icon_bg_color, | |
) -> None: | |
logger.info(f"Updating starter project {project_name}") | |
existing_project.data = project_data | |
existing_project.folder = STARTER_FOLDER_NAME | |
existing_project.description = project_description | |
existing_project.is_component = project_is_component | |
existing_project.updated_at = updated_at_datetime | |
existing_project.icon = project_icon | |
existing_project.icon_bg_color = project_icon_bg_color | |
def create_new_project( | |
session, | |
project_name, | |
project_description, | |
project_is_component, | |
updated_at_datetime, | |
project_data, | |
project_gradient, | |
project_tags, | |
project_icon, | |
project_icon_bg_color, | |
new_folder_id, | |
) -> None: | |
logger.debug(f"Creating starter project {project_name}") | |
new_project = FlowCreate( | |
name=project_name, | |
description=project_description, | |
icon=project_icon, | |
icon_bg_color=project_icon_bg_color, | |
data=project_data, | |
is_component=project_is_component, | |
updated_at=updated_at_datetime, | |
folder_id=new_folder_id, | |
gradient=project_gradient, | |
tags=project_tags, | |
) | |
db_flow = Flow.model_validate(new_project, from_attributes=True) | |
session.add(db_flow) | |
def get_all_flows_similar_to_project(session, folder_id): | |
return session.exec(select(Folder).where(Folder.id == folder_id)).first().flows | |
def delete_start_projects(session, folder_id) -> None: | |
flows = session.exec(select(Folder).where(Folder.id == folder_id)).first().flows | |
for flow in flows: | |
session.delete(flow) | |
session.commit() | |
def folder_exists(session, folder_name): | |
folder = session.exec(select(Folder).where(Folder.name == folder_name)).first() | |
return folder is not None | |
def create_starter_folder(session): | |
if not folder_exists(session, STARTER_FOLDER_NAME): | |
new_folder = FolderCreate(name=STARTER_FOLDER_NAME, description=STARTER_FOLDER_DESCRIPTION) | |
db_folder = Folder.model_validate(new_folder, from_attributes=True) | |
session.add(db_folder) | |
session.commit() | |
session.refresh(db_folder) | |
return db_folder | |
return session.exec(select(Folder).where(Folder.name == STARTER_FOLDER_NAME)).first() | |
def _is_valid_uuid(val): | |
try: | |
uuid_obj = UUID(val) | |
except ValueError: | |
return False | |
return str(uuid_obj) == val | |
async def load_flows_from_directory() -> None: | |
"""On langflow startup, this loads all flows from the directory specified in the settings. | |
All flows are uploaded into the default folder for the superuser. | |
Note that this feature currently only works if AUTO_LOGIN is enabled in the settings. | |
""" | |
settings_service = get_settings_service() | |
flows_path = settings_service.settings.load_flows_path | |
if not flows_path: | |
return | |
if not settings_service.auth_settings.AUTO_LOGIN: | |
logger.warning("AUTO_LOGIN is disabled, not loading flows from directory") | |
return | |
async with async_session_scope() as session: | |
user = await get_user_by_username(session, settings_service.auth_settings.SUPERUSER) | |
if user is None: | |
msg = "Superuser not found in the database" | |
raise NoResultFound(msg) | |
user_id = user.id | |
flows_path_ = Path(flows_path) | |
files = [f for f in flows_path_.iterdir() if f.is_file()] | |
for file_path in files: | |
if file_path.suffix != ".json": | |
continue | |
logger.info(f"Loading flow from file: {file_path.name}") | |
async with async_open(file_path, "r", encoding="utf-8") as f: | |
content = await f.read() | |
flow = orjson.loads(content) | |
no_json_name = file_path.stem | |
flow_endpoint_name = flow.get("endpoint_name") | |
if _is_valid_uuid(no_json_name): | |
flow["id"] = no_json_name | |
flow_id = flow.get("id") | |
existing = await find_existing_flow(session, flow_id, flow_endpoint_name) | |
if existing: | |
logger.debug(f"Found existing flow: {existing.name}") | |
logger.info(f"Updating existing flow: {flow_id} with endpoint name {flow_endpoint_name}") | |
for key, value in flow.items(): | |
if hasattr(existing, key): | |
# flow dict from json and db representation are not 100% the same | |
setattr(existing, key, value) | |
existing.updated_at = datetime.now(tz=timezone.utc).astimezone() | |
existing.user_id = user_id | |
# Generally, folder_id should not be None, but we must check this due to the previous | |
# behavior where flows could be added and folder_id was None, orphaning | |
# them within Langflow. | |
if existing.folder_id is None: | |
folder_id = await get_default_folder_id(session, user_id) | |
existing.folder_id = folder_id | |
session.add(existing) | |
else: | |
logger.info(f"Creating new flow: {flow_id} with endpoint name {flow_endpoint_name}") | |
# Current behavior loads all new flows into default folder | |
folder_id = await get_default_folder_id(session, user_id) | |
flow["user_id"] = user_id | |
flow["folder_id"] = folder_id | |
flow = Flow.model_validate(flow, from_attributes=True) | |
flow.updated_at = datetime.now(tz=timezone.utc).astimezone() | |
session.add(flow) | |
async def find_existing_flow(session, flow_id, flow_endpoint_name): | |
if flow_endpoint_name: | |
logger.debug(f"flow_endpoint_name: {flow_endpoint_name}") | |
stmt = select(Flow).where(Flow.endpoint_name == flow_endpoint_name) | |
if existing := (await session.exec(stmt)).first(): | |
logger.debug(f"Found existing flow by endpoint name: {existing.name}") | |
return existing | |
stmt = select(Flow).where(Flow.id == flow_id) | |
if existing := (await session.exec(stmt)).first(): | |
logger.debug(f"Found existing flow by id: {flow_id}") | |
return existing | |
return None | |
def create_or_update_starter_projects(all_types_dict: dict) -> None: | |
with session_scope() as session: | |
new_folder = create_starter_folder(session) | |
starter_projects = load_starter_projects() | |
delete_start_projects(session, new_folder.id) | |
copy_profile_pictures() | |
for project_path, project in starter_projects: | |
( | |
project_name, | |
project_description, | |
project_is_component, | |
updated_at_datetime, | |
project_data, | |
project_icon, | |
project_icon_bg_color, | |
project_gradient, | |
project_tags, | |
) = get_project_data(project) | |
updated_project_data = update_projects_components_with_latest_component_versions( | |
project_data.copy(), all_types_dict | |
) | |
updated_project_data = update_edges_with_latest_component_versions(updated_project_data) | |
if updated_project_data != project_data: | |
project_data = updated_project_data | |
# We also need to update the project data in the file | |
update_project_file(project_path, project, updated_project_data) | |
if project_name and project_data: | |
for existing_project in get_all_flows_similar_to_project(session, new_folder.id): | |
session.delete(existing_project) | |
create_new_project( | |
session=session, | |
project_name=project_name, | |
project_description=project_description, | |
project_is_component=project_is_component, | |
updated_at_datetime=updated_at_datetime, | |
project_data=project_data, | |
project_icon=project_icon, | |
project_icon_bg_color=project_icon_bg_color, | |
project_gradient=project_gradient, | |
project_tags=project_tags, | |
new_folder_id=new_folder.id, | |
) | |
async def initialize_super_user_if_needed() -> None: | |
settings_service = get_settings_service() | |
if not settings_service.auth_settings.AUTO_LOGIN: | |
return | |
username = settings_service.auth_settings.SUPERUSER | |
password = settings_service.auth_settings.SUPERUSER_PASSWORD | |
if not username or not password: | |
msg = "SUPERUSER and SUPERUSER_PASSWORD must be set in the settings if AUTO_LOGIN is true." | |
raise ValueError(msg) | |
async with async_session_scope() as async_session: | |
super_user = await create_super_user(db=async_session, username=username, password=password) | |
await get_variable_service().initialize_user_variables(super_user.id, async_session) | |
await create_default_folder_if_it_doesnt_exist(async_session, super_user.id) | |
logger.info("Super user initialized") | |