Spaces:
Running
Running
import hashlib | |
from datetime import datetime, timezone | |
from http import HTTPStatus | |
from io import BytesIO | |
from pathlib import Path | |
from typing import Annotated | |
from uuid import UUID | |
from fastapi import APIRouter, Depends, HTTPException, UploadFile | |
from fastapi.responses import StreamingResponse | |
from langflow.api.utils import CurrentActiveUser, DbSession | |
from langflow.api.v1.schemas import UploadFileResponse | |
from langflow.services.database.models.flow import Flow | |
from langflow.services.deps import get_storage_service | |
from langflow.services.storage.service import StorageService | |
from langflow.services.storage.utils import build_content_type_from_extension | |
router = APIRouter(tags=["Files"], prefix="/files") | |
# Create dep that gets the flow_id from the request | |
# then finds it in the database and returns it while | |
# using the current user as the owner | |
async def get_flow_id( | |
flow_id: UUID, | |
current_user: CurrentActiveUser, | |
session: DbSession, | |
): | |
flow_id_str = str(flow_id) | |
# AttributeError: 'SelectOfScalar' object has no attribute 'first' | |
flow = await session.get(Flow, flow_id_str) | |
if not flow: | |
raise HTTPException(status_code=404, detail="Flow not found") | |
if flow.user_id != current_user.id: | |
raise HTTPException(status_code=403, detail="You don't have access to this flow") | |
return flow_id_str | |
async def upload_file( | |
*, | |
file: UploadFile, | |
flow_id: Annotated[UUID, Depends(get_flow_id)], | |
current_user: CurrentActiveUser, | |
session: DbSession, | |
storage_service: Annotated[StorageService, Depends(get_storage_service)], | |
) -> UploadFileResponse: | |
try: | |
flow_id_str = str(flow_id) | |
flow = await session.get(Flow, flow_id_str) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) from e | |
if flow.user_id != current_user.id: | |
raise HTTPException(status_code=403, detail="You don't have access to this flow") | |
try: | |
file_content = await file.read() | |
timestamp = datetime.now(tz=timezone.utc).astimezone().strftime("%Y-%m-%d_%H-%M-%S") | |
file_name = file.filename or hashlib.sha256(file_content).hexdigest() | |
full_file_name = f"{timestamp}_{file_name}" | |
folder = flow_id_str | |
await storage_service.save_file(flow_id=folder, file_name=full_file_name, data=file_content) | |
return UploadFileResponse(flow_id=flow_id_str, file_path=f"{folder}/{full_file_name}") | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) from e | |
async def download_file( | |
file_name: str, flow_id: UUID, storage_service: Annotated[StorageService, Depends(get_storage_service)] | |
): | |
flow_id_str = str(flow_id) | |
extension = file_name.split(".")[-1] | |
if not extension: | |
raise HTTPException(status_code=500, detail=f"Extension not found for file {file_name}") | |
try: | |
content_type = build_content_type_from_extension(extension) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) from e | |
if not content_type: | |
raise HTTPException(status_code=500, detail=f"Content type not found for extension {extension}") | |
try: | |
file_content = await storage_service.get_file(flow_id=flow_id_str, file_name=file_name) | |
headers = { | |
"Content-Disposition": f"attachment; filename={file_name} filename*=UTF-8''{file_name}", | |
"Content-Type": "application/octet-stream", | |
"Content-Length": str(len(file_content)), | |
} | |
return StreamingResponse(BytesIO(file_content), media_type=content_type, headers=headers) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) from e | |
async def download_image(file_name: str, flow_id: UUID): | |
storage_service = get_storage_service() | |
extension = file_name.split(".")[-1] | |
flow_id_str = str(flow_id) | |
if not extension: | |
raise HTTPException(status_code=500, detail=f"Extension not found for file {file_name}") | |
try: | |
content_type = build_content_type_from_extension(extension) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) from e | |
if not content_type: | |
raise HTTPException(status_code=500, detail=f"Content type not found for extension {extension}") | |
if not content_type.startswith("image"): | |
raise HTTPException(status_code=500, detail=f"Content type {content_type} is not an image") | |
try: | |
file_content = await storage_service.get_file(flow_id=flow_id_str, file_name=file_name) | |
return StreamingResponse(BytesIO(file_content), media_type=content_type) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) from e | |
async def download_profile_picture( | |
folder_name: str, | |
file_name: str, | |
): | |
try: | |
storage_service = get_storage_service() | |
extension = file_name.split(".")[-1] | |
config_dir = storage_service.settings_service.settings.config_dir | |
config_path = Path(config_dir) # type: ignore[arg-type] | |
folder_path = config_path / "profile_pictures" / folder_name | |
content_type = build_content_type_from_extension(extension) | |
file_content = await storage_service.get_file(flow_id=folder_path, file_name=file_name) # type: ignore[arg-type] | |
return StreamingResponse(BytesIO(file_content), media_type=content_type) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) from e | |
async def list_profile_pictures(): | |
try: | |
storage_service = get_storage_service() | |
config_dir = storage_service.settings_service.settings.config_dir | |
config_path = Path(config_dir) # type: ignore[arg-type] | |
people_path = config_path / "profile_pictures/People" | |
space_path = config_path / "profile_pictures/Space" | |
people = await storage_service.list_files(flow_id=people_path) # type: ignore[arg-type] | |
space = await storage_service.list_files(flow_id=space_path) # type: ignore[arg-type] | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) from e | |
files = [f"People/{i}" for i in people] | |
files += [f"Space/{i}" for i in space] | |
return {"files": files} | |
async def list_files( | |
flow_id: Annotated[UUID, Depends(get_flow_id)], | |
storage_service: Annotated[StorageService, Depends(get_storage_service)], | |
): | |
try: | |
flow_id_str = str(flow_id) | |
files = await storage_service.list_files(flow_id=flow_id_str) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) from e | |
return {"files": files} | |
async def delete_file( | |
file_name: str, | |
flow_id: Annotated[UUID, Depends(get_flow_id)], | |
storage_service: Annotated[StorageService, Depends(get_storage_service)], | |
): | |
try: | |
flow_id_str = str(flow_id) | |
await storage_service.delete_file(flow_id=flow_id_str, file_name=file_name) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) from e | |
return {"message": f"File {file_name} deleted successfully"} | |