Spaces:
Running
Running
import io | |
import json | |
import zipfile | |
from datetime import datetime, timezone | |
from typing import Annotated | |
import orjson | |
from fastapi import APIRouter, Depends, File, HTTPException, Response, UploadFile, status | |
from fastapi.encoders import jsonable_encoder | |
from fastapi.responses import StreamingResponse | |
from fastapi_pagination import Params | |
from fastapi_pagination.ext.sqlmodel import paginate | |
from sqlalchemy import or_, update | |
from sqlalchemy.orm import selectinload | |
from sqlmodel import select | |
from langflow.api.utils import CurrentActiveUser, DbSession, cascade_delete_flow, custom_params, remove_api_keys | |
from langflow.api.v1.flows import create_flows | |
from langflow.api.v1.schemas import FlowListCreate | |
from langflow.helpers.flow import generate_unique_flow_name | |
from langflow.helpers.folders import generate_unique_folder_name | |
from langflow.initial_setup.constants import STARTER_FOLDER_NAME | |
from langflow.services.database.models.flow.model import Flow, FlowCreate, FlowRead | |
from langflow.services.database.models.folder.constants import DEFAULT_FOLDER_NAME | |
from langflow.services.database.models.folder.model import ( | |
Folder, | |
FolderCreate, | |
FolderRead, | |
FolderReadWithFlows, | |
FolderUpdate, | |
) | |
from langflow.services.database.models.folder.pagination_model import FolderWithPaginatedFlows | |
router = APIRouter(prefix="/folders", tags=["Folders"]) | |
async def create_folder( | |
*, | |
session: DbSession, | |
folder: FolderCreate, | |
current_user: CurrentActiveUser, | |
): | |
try: | |
new_folder = Folder.model_validate(folder, from_attributes=True) | |
new_folder.user_id = current_user.id | |
# First check if the folder.name is unique | |
# there might be flows with name like: "MyFlow", "MyFlow (1)", "MyFlow (2)" | |
# so we need to check if the name is unique with `like` operator | |
# if we find a flow with the same name, we add a number to the end of the name | |
# based on the highest number found | |
if ( | |
await session.exec( | |
statement=select(Folder).where(Folder.name == new_folder.name).where(Folder.user_id == current_user.id) | |
) | |
).first(): | |
folder_results = await session.exec( | |
select(Folder).where( | |
Folder.name.like(f"{new_folder.name}%"), # type: ignore[attr-defined] | |
Folder.user_id == current_user.id, | |
) | |
) | |
if folder_results: | |
folder_names = [folder.name for folder in folder_results] | |
folder_numbers = [int(name.split("(")[-1].split(")")[0]) for name in folder_names if "(" in name] | |
if folder_numbers: | |
new_folder.name = f"{new_folder.name} ({max(folder_numbers) + 1})" | |
else: | |
new_folder.name = f"{new_folder.name} (1)" | |
session.add(new_folder) | |
await session.commit() | |
await session.refresh(new_folder) | |
if folder.components_list: | |
update_statement_components = ( | |
update(Flow).where(Flow.id.in_(folder.components_list)).values(folder_id=new_folder.id) # type: ignore[attr-defined] | |
) | |
await session.exec(update_statement_components) | |
await session.commit() | |
if folder.flows_list: | |
update_statement_flows = update(Flow).where(Flow.id.in_(folder.flows_list)).values(folder_id=new_folder.id) # type: ignore[attr-defined] | |
await session.exec(update_statement_flows) | |
await session.commit() | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) from e | |
return new_folder | |
async def read_folders( | |
*, | |
session: DbSession, | |
current_user: CurrentActiveUser, | |
): | |
try: | |
folders = ( | |
await session.exec( | |
select(Folder).where( | |
or_(Folder.user_id == current_user.id, Folder.user_id == None) # noqa: E711 | |
) | |
) | |
).all() | |
folders = [folder for folder in folders if folder.name != STARTER_FOLDER_NAME] | |
return sorted(folders, key=lambda x: x.name != DEFAULT_FOLDER_NAME) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) from e | |
async def read_folder( | |
*, | |
session: DbSession, | |
folder_id: str, | |
current_user: CurrentActiveUser, | |
params: Annotated[Params | None, Depends(custom_params)], | |
is_component: bool = False, | |
is_flow: bool = False, | |
search: str = "", | |
): | |
try: | |
folder = ( | |
await session.exec( | |
select(Folder) | |
.options(selectinload(Folder.flows)) | |
.where(Folder.id == folder_id, Folder.user_id == current_user.id) | |
) | |
).first() | |
except Exception as e: | |
if "No result found" in str(e): | |
raise HTTPException(status_code=404, detail="Folder not found") from e | |
raise HTTPException(status_code=500, detail=str(e)) from e | |
if not folder: | |
raise HTTPException(status_code=404, detail="Folder not found") | |
try: | |
if params and params.page and params.size: | |
stmt = select(Flow).where(Flow.folder_id == folder_id) | |
if Flow.updated_at is not None: | |
stmt = stmt.order_by(Flow.updated_at.desc()) # type: ignore[attr-defined] | |
if is_component: | |
stmt = stmt.where(Flow.is_component == True) # noqa: E712 | |
if is_flow: | |
stmt = stmt.where(Flow.is_component == False) # noqa: E712 | |
if search: | |
stmt = stmt.where(Flow.name.like(f"%{search}%")) # type: ignore[attr-defined] | |
paginated_flows = await paginate(session, stmt, params=params) | |
return FolderWithPaginatedFlows(folder=FolderRead.model_validate(folder), flows=paginated_flows) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) from e | |
flows_from_current_user_in_folder = [flow for flow in folder.flows if flow.user_id == current_user.id] | |
folder.flows = flows_from_current_user_in_folder | |
return folder | |
async def update_folder( | |
*, | |
session: DbSession, | |
folder_id: str, | |
folder: FolderUpdate, # Assuming FolderUpdate is a Pydantic model defining updatable fields | |
current_user: CurrentActiveUser, | |
): | |
try: | |
existing_folder = ( | |
await session.exec(select(Folder).where(Folder.id == folder_id, Folder.user_id == current_user.id)) | |
).first() | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) from e | |
if not existing_folder: | |
raise HTTPException(status_code=404, detail="Folder not found") | |
try: | |
if folder.name and folder.name != existing_folder.name: | |
existing_folder.name = folder.name | |
session.add(existing_folder) | |
await session.commit() | |
await session.refresh(existing_folder) | |
return existing_folder | |
folder_data = existing_folder.model_dump(exclude_unset=True) | |
for key, value in folder_data.items(): | |
if key not in {"components", "flows"}: | |
setattr(existing_folder, key, value) | |
session.add(existing_folder) | |
await session.commit() | |
await session.refresh(existing_folder) | |
concat_folder_components = folder.components + folder.flows | |
flows_ids = (await session.exec(select(Flow.id).where(Flow.folder_id == existing_folder.id))).all() | |
excluded_flows = list(set(flows_ids) - set(concat_folder_components)) | |
my_collection_folder = (await session.exec(select(Folder).where(Folder.name == DEFAULT_FOLDER_NAME))).first() | |
if my_collection_folder: | |
update_statement_my_collection = ( | |
update(Flow).where(Flow.id.in_(excluded_flows)).values(folder_id=my_collection_folder.id) # type: ignore[attr-defined] | |
) | |
await session.exec(update_statement_my_collection) | |
await session.commit() | |
if concat_folder_components: | |
update_statement_components = ( | |
update(Flow).where(Flow.id.in_(concat_folder_components)).values(folder_id=existing_folder.id) # type: ignore[attr-defined] | |
) | |
await session.exec(update_statement_components) | |
await session.commit() | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) from e | |
return existing_folder | |
async def delete_folder( | |
*, | |
session: DbSession, | |
folder_id: str, | |
current_user: CurrentActiveUser, | |
): | |
try: | |
flows = ( | |
await session.exec(select(Flow).where(Flow.folder_id == folder_id, Flow.user_id == current_user.id)) | |
).all() | |
if len(flows) > 0: | |
for flow in flows: | |
await cascade_delete_flow(session, flow.id) | |
folder = ( | |
await session.exec(select(Folder).where(Folder.id == folder_id, Folder.user_id == current_user.id)) | |
).first() | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) from e | |
if not folder: | |
raise HTTPException(status_code=404, detail="Folder not found") | |
try: | |
await session.delete(folder) | |
await session.commit() | |
return Response(status_code=status.HTTP_204_NO_CONTENT) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) from e | |
async def download_file( | |
*, | |
session: DbSession, | |
folder_id: str, | |
current_user: CurrentActiveUser, | |
): | |
"""Download all flows from folder as a zip file.""" | |
try: | |
query = select(Folder).where(Folder.id == folder_id, Folder.user_id == current_user.id) | |
result = await session.exec(query) | |
folder = result.first() | |
if not folder: | |
raise HTTPException(status_code=404, detail="Folder not found") | |
flows_query = select(Flow).where(Flow.folder_id == folder_id) | |
flows_result = await session.exec(flows_query) | |
flows = [FlowRead.model_validate(flow, from_attributes=True) for flow in flows_result.all()] | |
if not flows: | |
raise HTTPException(status_code=404, detail="No flows found in folder") | |
flows_without_api_keys = [remove_api_keys(flow.model_dump()) for flow in flows] | |
zip_stream = io.BytesIO() | |
with zipfile.ZipFile(zip_stream, "w") as zip_file: | |
for flow in flows_without_api_keys: | |
flow_json = json.dumps(jsonable_encoder(flow)) | |
zip_file.writestr(f"{flow['name']}.json", flow_json) | |
zip_stream.seek(0) | |
current_time = datetime.now(tz=timezone.utc).astimezone().strftime("%Y%m%d_%H%M%S") | |
filename = f"{current_time}_{folder.name}_flows.zip" | |
return StreamingResponse( | |
zip_stream, | |
media_type="application/x-zip-compressed", | |
headers={"Content-Disposition": f"attachment; filename={filename}"}, | |
) | |
except Exception as e: | |
if "No result found" in str(e): | |
raise HTTPException(status_code=404, detail="Folder not found") from e | |
raise HTTPException(status_code=500, detail=str(e)) from e | |
async def upload_file( | |
*, | |
session: DbSession, | |
file: Annotated[UploadFile, File(...)], | |
current_user: CurrentActiveUser, | |
): | |
"""Upload flows from a file.""" | |
contents = await file.read() | |
data = orjson.loads(contents) | |
if not data: | |
raise HTTPException(status_code=400, detail="No flows found in the file") | |
folder_name = await generate_unique_folder_name(data["folder_name"], current_user.id, session) | |
data["folder_name"] = folder_name | |
folder = FolderCreate(name=data["folder_name"], description=data["folder_description"]) | |
new_folder = Folder.model_validate(folder, from_attributes=True) | |
new_folder.id = None | |
new_folder.user_id = current_user.id | |
session.add(new_folder) | |
await session.commit() | |
await session.refresh(new_folder) | |
del data["folder_name"] | |
del data["folder_description"] | |
if "flows" in data: | |
flow_list = FlowListCreate(flows=[FlowCreate(**flow) for flow in data["flows"]]) | |
else: | |
raise HTTPException(status_code=400, detail="No flows found in the data") | |
# Now we set the user_id for all flows | |
for flow in flow_list.flows: | |
flow_name = await generate_unique_flow_name(flow.name, current_user.id, session) | |
flow.name = flow_name | |
flow.user_id = current_user.id | |
flow.folder_id = new_folder.id | |
return await create_flows(session=session, flow_list=flow_list, current_user=current_user) | |