Spaces:
Running
Running
| from __future__ import annotations | |
| import io | |
| import json | |
| import re | |
| import zipfile | |
| from datetime import datetime, timezone | |
| from typing import Annotated | |
| from uuid import UUID | |
| import orjson | |
| from fastapi import APIRouter, Depends, File, HTTPException, UploadFile | |
| from fastapi.encoders import jsonable_encoder | |
| from fastapi.responses import StreamingResponse | |
| from fastapi_pagination import Page, Params, add_pagination | |
| from fastapi_pagination.ext.sqlalchemy import paginate | |
| from sqlmodel import and_, col, select | |
| from sqlmodel.ext.asyncio.session import AsyncSession | |
| from langflow.api.utils import ( | |
| CurrentActiveUser, | |
| DbSession, | |
| cascade_delete_flow, | |
| remove_api_keys, | |
| validate_is_component, | |
| ) | |
| from langflow.api.v1.schemas import FlowListCreate | |
| from langflow.initial_setup.constants import STARTER_FOLDER_NAME | |
| from langflow.services.database.models.flow import Flow, FlowCreate, FlowRead, FlowUpdate | |
| from langflow.services.database.models.flow.model import FlowHeader | |
| from langflow.services.database.models.flow.utils import get_webhook_component_in_flow | |
| from langflow.services.database.models.folder.constants import DEFAULT_FOLDER_NAME | |
| from langflow.services.database.models.folder.model import Folder | |
| from langflow.services.database.models.transactions.crud import get_transactions_by_flow_id | |
| from langflow.services.database.models.vertex_builds.crud import get_vertex_builds_by_flow_id | |
| from langflow.services.deps import get_settings_service | |
| from langflow.services.settings.service import SettingsService | |
| # build router | |
| router = APIRouter(prefix="/flows", tags=["Flows"]) | |
| async def _new_flow( | |
| *, | |
| session: AsyncSession, | |
| flow: FlowCreate, | |
| user_id: UUID, | |
| ): | |
| try: | |
| """Create a new flow.""" | |
| if flow.user_id is None: | |
| flow.user_id = user_id | |
| # First check if the flow.name is unique | |
| # there might be flows with name like: "MyFlow", "MyFlow (1)", "MyFlow (2)" | |
| # so we need to check if the name is unique with `like` operator | |
| # if we find a flow with the same name, we add a number to the end of the name | |
| # based on the highest number found | |
| if (await session.exec(select(Flow).where(Flow.name == flow.name).where(Flow.user_id == user_id))).first(): | |
| flows = ( | |
| await session.exec( | |
| select(Flow).where(Flow.name.like(f"{flow.name} (%")).where(Flow.user_id == user_id) # type: ignore[attr-defined] | |
| ) | |
| ).all() | |
| if flows: | |
| extract_number = re.compile(r"\((\d+)\)$") | |
| numbers = [] | |
| for _flow in flows: | |
| result = extract_number.search(_flow.name) | |
| if result: | |
| numbers.append(int(result.groups(1)[0])) | |
| if numbers: | |
| flow.name = f"{flow.name} ({max(numbers) + 1})" | |
| else: | |
| flow.name = f"{flow.name} (1)" | |
| # Now check if the endpoint is unique | |
| if ( | |
| flow.endpoint_name | |
| and ( | |
| await session.exec( | |
| select(Flow).where(Flow.endpoint_name == flow.endpoint_name).where(Flow.user_id == user_id) | |
| ) | |
| ).first() | |
| ): | |
| flows = ( | |
| await session.exec( | |
| select(Flow) | |
| .where(Flow.endpoint_name.like(f"{flow.endpoint_name}-%")) # type: ignore[union-attr] | |
| .where(Flow.user_id == user_id) | |
| ) | |
| ).all() | |
| if flows: | |
| # The endpoint name is like "my-endpoint","my-endpoint-1", "my-endpoint-2" | |
| # so we need to get the highest number and add 1 | |
| # we need to get the last part of the endpoint name | |
| numbers = [int(flow.endpoint_name.split("-")[-1]) for flow in flows] | |
| flow.endpoint_name = f"{flow.endpoint_name}-{max(numbers) + 1}" | |
| else: | |
| flow.endpoint_name = f"{flow.endpoint_name}-1" | |
| db_flow = Flow.model_validate(flow, from_attributes=True) | |
| db_flow.updated_at = datetime.now(timezone.utc) | |
| if db_flow.folder_id is None: | |
| # Make sure flows always have a folder | |
| default_folder = ( | |
| await session.exec(select(Folder).where(Folder.name == DEFAULT_FOLDER_NAME, Folder.user_id == user_id)) | |
| ).first() | |
| if default_folder: | |
| db_flow.folder_id = default_folder.id | |
| session.add(db_flow) | |
| except Exception as e: | |
| # If it is a validation error, return the error message | |
| if hasattr(e, "errors"): | |
| raise HTTPException(status_code=400, detail=str(e)) from e | |
| if isinstance(e, HTTPException): | |
| raise | |
| raise HTTPException(status_code=500, detail=str(e)) from e | |
| return db_flow | |
| async def create_flow( | |
| *, | |
| session: DbSession, | |
| flow: FlowCreate, | |
| current_user: CurrentActiveUser, | |
| ): | |
| try: | |
| db_flow = await _new_flow(session=session, flow=flow, user_id=current_user.id) | |
| await session.commit() | |
| await session.refresh(db_flow) | |
| except Exception as e: | |
| if "UNIQUE constraint failed" in str(e): | |
| # Get the name of the column that failed | |
| columns = str(e).split("UNIQUE constraint failed: ")[1].split(".")[1].split("\n")[0] | |
| # UNIQUE constraint failed: flow.user_id, flow.name | |
| # or UNIQUE constraint failed: flow.name | |
| # if the column has id in it, we want the other column | |
| column = columns.split(",")[1] if "id" in columns.split(",")[0] else columns.split(",")[0] | |
| raise HTTPException( | |
| status_code=400, detail=f"{column.capitalize().replace('_', ' ')} must be unique" | |
| ) from e | |
| if isinstance(e, HTTPException): | |
| raise | |
| raise HTTPException(status_code=500, detail=str(e)) from e | |
| return db_flow | |
| async def read_flows( | |
| *, | |
| current_user: CurrentActiveUser, | |
| session: DbSession, | |
| remove_example_flows: bool = False, | |
| components_only: bool = False, | |
| get_all: bool = True, | |
| folder_id: UUID | None = None, | |
| params: Annotated[Params, Depends()], | |
| header_flows: bool = False, | |
| ): | |
| """Retrieve a list of flows with pagination support. | |
| Args: | |
| current_user (User): The current authenticated user. | |
| session (Session): The database session. | |
| settings_service (SettingsService): The settings service. | |
| components_only (bool, optional): Whether to return only components. Defaults to False. | |
| get_all (bool, optional): Whether to return all flows without pagination. Defaults to True. | |
| **This field must be True because of backward compatibility with the frontend - Release: 1.0.20** | |
| folder_id (UUID, optional): The folder ID. Defaults to None. | |
| params (Params): Pagination parameters. | |
| remove_example_flows (bool, optional): Whether to remove example flows. Defaults to False. | |
| header_flows (bool, optional): Whether to return only specific headers of the flows. Defaults to False. | |
| Returns: | |
| list[FlowRead] | Page[FlowRead] | list[FlowHeader] | |
| A list of flows or a paginated response containing the list of flows or a list of flow headers. | |
| """ | |
| try: | |
| auth_settings = get_settings_service().auth_settings | |
| default_folder = (await session.exec(select(Folder).where(Folder.name == DEFAULT_FOLDER_NAME))).first() | |
| default_folder_id = default_folder.id if default_folder else None | |
| starter_folder = (await session.exec(select(Folder).where(Folder.name == STARTER_FOLDER_NAME))).first() | |
| starter_folder_id = starter_folder.id if starter_folder else None | |
| if not starter_folder and not default_folder: | |
| raise HTTPException( | |
| status_code=404, | |
| detail="Starter folder and default folder not found. Please create a folder and add flows to it.", | |
| ) | |
| if not folder_id: | |
| folder_id = default_folder_id | |
| if auth_settings.AUTO_LOGIN: | |
| stmt = select(Flow).where( | |
| (Flow.user_id == None) | (Flow.user_id == current_user.id) # noqa: E711 | |
| ) | |
| else: | |
| stmt = select(Flow).where(Flow.user_id == current_user.id) | |
| if remove_example_flows: | |
| stmt = stmt.where(Flow.folder_id != starter_folder_id) | |
| if components_only: | |
| stmt = stmt.where(Flow.is_component == True) # noqa: E712 | |
| if get_all: | |
| flows = (await session.exec(stmt)).all() | |
| flows = validate_is_component(flows) | |
| if components_only: | |
| flows = [flow for flow in flows if flow.is_component] | |
| if remove_example_flows and starter_folder_id: | |
| flows = [flow for flow in flows if flow.folder_id != starter_folder_id] | |
| if header_flows: | |
| return [ | |
| { | |
| "id": flow.id, | |
| "name": flow.name, | |
| "folder_id": flow.folder_id, | |
| "is_component": flow.is_component, | |
| "description": flow.description, | |
| } | |
| for flow in flows | |
| ] | |
| return flows | |
| stmt = stmt.where(Flow.folder_id == folder_id) | |
| return await paginate(session, stmt, params=params) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) from e | |
| async def _read_flow( | |
| session: AsyncSession, | |
| flow_id: UUID, | |
| user_id: UUID, | |
| settings_service: SettingsService, | |
| ): | |
| """Read a flow.""" | |
| auth_settings = settings_service.auth_settings | |
| stmt = select(Flow).where(Flow.id == flow_id) | |
| if auth_settings.AUTO_LOGIN: | |
| # If auto login is enable user_id can be current_user.id or None | |
| # so write an OR | |
| stmt = stmt.where( | |
| (Flow.user_id == user_id) | (Flow.user_id == None) # noqa: E711 | |
| ) | |
| return (await session.exec(stmt)).first() | |
| async def read_flow( | |
| *, | |
| session: DbSession, | |
| flow_id: UUID, | |
| current_user: CurrentActiveUser, | |
| ): | |
| """Read a flow.""" | |
| if user_flow := await _read_flow(session, flow_id, current_user.id, get_settings_service()): | |
| return user_flow | |
| raise HTTPException(status_code=404, detail="Flow not found") | |
| async def update_flow( | |
| *, | |
| session: DbSession, | |
| flow_id: UUID, | |
| flow: FlowUpdate, | |
| current_user: CurrentActiveUser, | |
| ): | |
| """Update a flow.""" | |
| settings_service = get_settings_service() | |
| try: | |
| db_flow = await _read_flow( | |
| session=session, | |
| flow_id=flow_id, | |
| user_id=current_user.id, | |
| settings_service=settings_service, | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) from e | |
| if not db_flow: | |
| raise HTTPException(status_code=404, detail="Flow not found") | |
| try: | |
| flow_data = flow.model_dump(exclude_unset=True) | |
| if settings_service.settings.remove_api_keys: | |
| flow_data = remove_api_keys(flow_data) | |
| for key, value in flow_data.items(): | |
| setattr(db_flow, key, value) | |
| webhook_component = get_webhook_component_in_flow(db_flow.data) | |
| db_flow.webhook = webhook_component is not None | |
| db_flow.updated_at = datetime.now(timezone.utc) | |
| if db_flow.folder_id is None: | |
| default_folder = (await session.exec(select(Folder).where(Folder.name == DEFAULT_FOLDER_NAME))).first() | |
| if default_folder: | |
| db_flow.folder_id = default_folder.id | |
| session.add(db_flow) | |
| await session.commit() | |
| await session.refresh(db_flow) | |
| except Exception as e: | |
| # If it is a validation error, return the error message | |
| if hasattr(e, "errors"): | |
| raise HTTPException(status_code=400, detail=str(e)) from e | |
| if "UNIQUE constraint failed" in str(e): | |
| # Get the name of the column that failed | |
| columns = str(e).split("UNIQUE constraint failed: ")[1].split(".")[1].split("\n")[0] | |
| # UNIQUE constraint failed: flow.user_id, flow.name | |
| # or UNIQUE constraint failed: flow.name | |
| # if the column has id in it, we want the other column | |
| column = columns.split(",")[1] if "id" in columns.split(",")[0] else columns.split(",")[0] | |
| raise HTTPException( | |
| status_code=400, detail=f"{column.capitalize().replace('_', ' ')} must be unique" | |
| ) from e | |
| raise HTTPException(status_code=500, detail=str(e)) from e | |
| return db_flow | |
| async def delete_flow( | |
| *, | |
| session: DbSession, | |
| flow_id: UUID, | |
| current_user: CurrentActiveUser, | |
| ): | |
| """Delete a flow.""" | |
| flow = await _read_flow( | |
| session=session, | |
| flow_id=flow_id, | |
| user_id=current_user.id, | |
| settings_service=get_settings_service(), | |
| ) | |
| if not flow: | |
| raise HTTPException(status_code=404, detail="Flow not found") | |
| await cascade_delete_flow(session, flow.id) | |
| await session.commit() | |
| return {"message": "Flow deleted successfully"} | |
| async def create_flows( | |
| *, | |
| session: DbSession, | |
| flow_list: FlowListCreate, | |
| current_user: CurrentActiveUser, | |
| ): | |
| """Create multiple new flows.""" | |
| db_flows = [] | |
| for flow in flow_list.flows: | |
| flow.user_id = current_user.id | |
| db_flow = Flow.model_validate(flow, from_attributes=True) | |
| session.add(db_flow) | |
| db_flows.append(db_flow) | |
| await session.commit() | |
| for db_flow in db_flows: | |
| await session.refresh(db_flow) | |
| return db_flows | |
| async def upload_file( | |
| *, | |
| session: DbSession, | |
| file: Annotated[UploadFile, File(...)], | |
| current_user: CurrentActiveUser, | |
| folder_id: UUID | None = None, | |
| ): | |
| """Upload flows from a file.""" | |
| contents = await file.read() | |
| data = orjson.loads(contents) | |
| response_list = [] | |
| flow_list = FlowListCreate(**data) if "flows" in data else FlowListCreate(flows=[FlowCreate(**data)]) | |
| # Now we set the user_id for all flows | |
| for flow in flow_list.flows: | |
| flow.user_id = current_user.id | |
| if folder_id: | |
| flow.folder_id = folder_id | |
| response = await _new_flow(session=session, flow=flow, user_id=current_user.id) | |
| response_list.append(response) | |
| try: | |
| await session.commit() | |
| for db_flow in response_list: | |
| await session.refresh(db_flow) | |
| except Exception as e: | |
| if "UNIQUE constraint failed" in str(e): | |
| # Get the name of the column that failed | |
| columns = str(e).split("UNIQUE constraint failed: ")[1].split(".")[1].split("\n")[0] | |
| # UNIQUE constraint failed: flow.user_id, flow.name | |
| # or UNIQUE constraint failed: flow.name | |
| # if the column has id in it, we want the other column | |
| column = columns.split(",")[1] if "id" in columns.split(",")[0] else columns.split(",")[0] | |
| raise HTTPException( | |
| status_code=400, detail=f"{column.capitalize().replace('_', ' ')} must be unique" | |
| ) from e | |
| if isinstance(e, HTTPException): | |
| raise | |
| raise HTTPException(status_code=500, detail=str(e)) from e | |
| return response_list | |
| async def delete_multiple_flows( | |
| flow_ids: list[UUID], | |
| user: CurrentActiveUser, | |
| db: DbSession, | |
| ): | |
| """Delete multiple flows by their IDs. | |
| Args: | |
| flow_ids (List[str]): The list of flow IDs to delete. | |
| user (User, optional): The user making the request. Defaults to the current active user. | |
| db (Session, optional): The database session. | |
| Returns: | |
| dict: A dictionary containing the number of flows deleted. | |
| """ | |
| try: | |
| flows_to_delete = ( | |
| await db.exec(select(Flow).where(col(Flow.id).in_(flow_ids)).where(Flow.user_id == user.id)) | |
| ).all() | |
| for flow in flows_to_delete: | |
| transactions_to_delete = await get_transactions_by_flow_id(db, flow.id) | |
| for transaction in transactions_to_delete: | |
| await db.delete(transaction) | |
| builds_to_delete = await get_vertex_builds_by_flow_id(db, flow.id) | |
| for build in builds_to_delete: | |
| await db.delete(build) | |
| await db.delete(flow) | |
| await db.commit() | |
| return {"deleted": len(flows_to_delete)} | |
| except Exception as exc: | |
| raise HTTPException(status_code=500, detail=str(exc)) from exc | |
| async def download_multiple_file( | |
| flow_ids: list[UUID], | |
| user: CurrentActiveUser, | |
| db: DbSession, | |
| ): | |
| """Download all flows as a zip file.""" | |
| flows = (await db.exec(select(Flow).where(and_(Flow.user_id == user.id, Flow.id.in_(flow_ids))))).all() # type: ignore[attr-defined] | |
| if not flows: | |
| raise HTTPException(status_code=404, detail="No flows found.") | |
| flows_without_api_keys = [remove_api_keys(flow.model_dump()) for flow in flows] | |
| if len(flows_without_api_keys) > 1: | |
| # Create a byte stream to hold the ZIP file | |
| zip_stream = io.BytesIO() | |
| # Create a ZIP file | |
| with zipfile.ZipFile(zip_stream, "w") as zip_file: | |
| for flow in flows_without_api_keys: | |
| # Convert the flow object to JSON | |
| flow_json = json.dumps(jsonable_encoder(flow)) | |
| # Write the JSON to the ZIP file | |
| zip_file.writestr(f"{flow['name']}.json", flow_json) | |
| # Seek to the beginning of the byte stream | |
| zip_stream.seek(0) | |
| # Generate the filename with the current datetime | |
| current_time = datetime.now(tz=timezone.utc).astimezone().strftime("%Y%m%d_%H%M%S") | |
| filename = f"{current_time}_langflow_flows.zip" | |
| return StreamingResponse( | |
| zip_stream, | |
| media_type="application/x-zip-compressed", | |
| headers={"Content-Disposition": f"attachment; filename={filename}"}, | |
| ) | |
| return flows_without_api_keys[0] | |
| async def read_basic_examples( | |
| *, | |
| session: DbSession, | |
| ): | |
| """Retrieve a list of basic example flows. | |
| Args: | |
| session (Session): The database session. | |
| Returns: | |
| list[FlowRead]: A list of basic example flows. | |
| """ | |
| try: | |
| # Get the starter folder | |
| starter_folder = (await session.exec(select(Folder).where(Folder.name == STARTER_FOLDER_NAME))).first() | |
| if not starter_folder: | |
| return [] | |
| # Get all flows in the starter folder | |
| return (await session.exec(select(Flow).where(Flow.folder_id == starter_folder.id))).all() | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) from e | |
| add_pagination(router) | |