Tai Truong
fix readme
d202ada
import hashlib
from datetime import datetime, timezone
from http import HTTPStatus
from io import BytesIO
from pathlib import Path
from typing import Annotated
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, UploadFile
from fastapi.responses import StreamingResponse
from langflow.api.utils import CurrentActiveUser, DbSession
from langflow.api.v1.schemas import UploadFileResponse
from langflow.services.database.models.flow import Flow
from langflow.services.deps import get_storage_service
from langflow.services.storage.service import StorageService
from langflow.services.storage.utils import build_content_type_from_extension
router = APIRouter(tags=["Files"], prefix="/files")
# Create dep that gets the flow_id from the request
# then finds it in the database and returns it while
# using the current user as the owner
async def get_flow_id(
flow_id: UUID,
current_user: CurrentActiveUser,
session: DbSession,
):
flow_id_str = str(flow_id)
# AttributeError: 'SelectOfScalar' object has no attribute 'first'
flow = await session.get(Flow, flow_id_str)
if not flow:
raise HTTPException(status_code=404, detail="Flow not found")
if flow.user_id != current_user.id:
raise HTTPException(status_code=403, detail="You don't have access to this flow")
return flow_id_str
@router.post("/upload/{flow_id}", status_code=HTTPStatus.CREATED)
async def upload_file(
*,
file: UploadFile,
flow_id: Annotated[UUID, Depends(get_flow_id)],
current_user: CurrentActiveUser,
session: DbSession,
storage_service: Annotated[StorageService, Depends(get_storage_service)],
) -> UploadFileResponse:
try:
flow_id_str = str(flow_id)
flow = await session.get(Flow, flow_id_str)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
if flow.user_id != current_user.id:
raise HTTPException(status_code=403, detail="You don't have access to this flow")
try:
file_content = await file.read()
timestamp = datetime.now(tz=timezone.utc).astimezone().strftime("%Y-%m-%d_%H-%M-%S")
file_name = file.filename or hashlib.sha256(file_content).hexdigest()
full_file_name = f"{timestamp}_{file_name}"
folder = flow_id_str
await storage_service.save_file(flow_id=folder, file_name=full_file_name, data=file_content)
return UploadFileResponse(flow_id=flow_id_str, file_path=f"{folder}/{full_file_name}")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
@router.get("/download/{flow_id}/{file_name}")
async def download_file(
file_name: str, flow_id: UUID, storage_service: Annotated[StorageService, Depends(get_storage_service)]
):
flow_id_str = str(flow_id)
extension = file_name.split(".")[-1]
if not extension:
raise HTTPException(status_code=500, detail=f"Extension not found for file {file_name}")
try:
content_type = build_content_type_from_extension(extension)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
if not content_type:
raise HTTPException(status_code=500, detail=f"Content type not found for extension {extension}")
try:
file_content = await storage_service.get_file(flow_id=flow_id_str, file_name=file_name)
headers = {
"Content-Disposition": f"attachment; filename={file_name} filename*=UTF-8''{file_name}",
"Content-Type": "application/octet-stream",
"Content-Length": str(len(file_content)),
}
return StreamingResponse(BytesIO(file_content), media_type=content_type, headers=headers)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
@router.get("/images/{flow_id}/{file_name}")
async def download_image(file_name: str, flow_id: UUID):
storage_service = get_storage_service()
extension = file_name.split(".")[-1]
flow_id_str = str(flow_id)
if not extension:
raise HTTPException(status_code=500, detail=f"Extension not found for file {file_name}")
try:
content_type = build_content_type_from_extension(extension)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
if not content_type:
raise HTTPException(status_code=500, detail=f"Content type not found for extension {extension}")
if not content_type.startswith("image"):
raise HTTPException(status_code=500, detail=f"Content type {content_type} is not an image")
try:
file_content = await storage_service.get_file(flow_id=flow_id_str, file_name=file_name)
return StreamingResponse(BytesIO(file_content), media_type=content_type)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
@router.get("/profile_pictures/{folder_name}/{file_name}")
async def download_profile_picture(
folder_name: str,
file_name: str,
):
try:
storage_service = get_storage_service()
extension = file_name.split(".")[-1]
config_dir = storage_service.settings_service.settings.config_dir
config_path = Path(config_dir) # type: ignore[arg-type]
folder_path = config_path / "profile_pictures" / folder_name
content_type = build_content_type_from_extension(extension)
file_content = await storage_service.get_file(flow_id=folder_path, file_name=file_name) # type: ignore[arg-type]
return StreamingResponse(BytesIO(file_content), media_type=content_type)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
@router.get("/profile_pictures/list")
async def list_profile_pictures():
try:
storage_service = get_storage_service()
config_dir = storage_service.settings_service.settings.config_dir
config_path = Path(config_dir) # type: ignore[arg-type]
people_path = config_path / "profile_pictures/People"
space_path = config_path / "profile_pictures/Space"
people = await storage_service.list_files(flow_id=people_path) # type: ignore[arg-type]
space = await storage_service.list_files(flow_id=space_path) # type: ignore[arg-type]
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
files = [f"People/{i}" for i in people]
files += [f"Space/{i}" for i in space]
return {"files": files}
@router.get("/list/{flow_id}")
async def list_files(
flow_id: Annotated[UUID, Depends(get_flow_id)],
storage_service: Annotated[StorageService, Depends(get_storage_service)],
):
try:
flow_id_str = str(flow_id)
files = await storage_service.list_files(flow_id=flow_id_str)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
return {"files": files}
@router.delete("/delete/{flow_id}/{file_name}")
async def delete_file(
file_name: str,
flow_id: Annotated[UUID, Depends(get_flow_id)],
storage_service: Annotated[StorageService, Depends(get_storage_service)],
):
try:
flow_id_str = str(flow_id)
await storage_service.delete_file(flow_id=flow_id_str, file_name=file_name)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
return {"message": f"File {file_name} deleted successfully"}