api / app.py
paola1's picture
Update app.py
5832dd0 verified
raw
history blame
2.37 kB
from fastapi import FastAPI, File, UploadFile, HTTPException, Form
import uuid
import mimetypes
from datetime import datetime, timedelta
app = FastAPI()
# **In-Memory Storage (FOR DEMO ONLY)**
# Replace with Redis, NoSQL DB, or Cloud Storage for Production
data_storage = {}
@app.post("/upload/file")
async def upload_file(file: UploadFile = File(...)):
upload_id = str(uuid.uuid4())
data_storage[upload_id] = {
'file_name': file.filename,
'file_data': await file.read(),
'mime_type': file.content_type,
'upload_time': datetime.utcnow()
}
return {"upload_id": upload_id, "message": "File uploaded successfully"}
@app.post("/upload/custom")
async def upload_custom(file: UploadFile = File(...), download_link: str = Form(...)):
if not download_link:
raise HTTPException(status_code=400, detail="Download link is required")
# Validate the download_link to be URL-safe
if not download_link.isalnum() and not all(c in string.ascii_letters + string.digits + '-_' for c in download_link):
raise HTTPException(status_code=400, detail="Invalid download link format")
# Assign the custom download link
data_storage[download_link] = {
'file_name': file.filename,
'file_data': await file.read(),
'mime_type': file.content_type,
'upload_time': datetime.utcnow()
}
return {"download_link": download_link, "message": "File uploaded successfully"}
@app.get("/download/{upload_id}")
async def download_file(upload_id: str):
# Remove expired files
expire_time = datetime.utcnow() - timedelta(minutes=5)
keys_to_remove = [key for key, value in data_storage.items() if value['upload_time'] < expire_time]
for key in keys_to_remove:
del data_storage[key]
if upload_id not in data_storage:
raise HTTPException(status_code=404, detail="Upload not found")
upload_data = data_storage[upload_id]
file_name = upload_data['file_name']
mime_type = upload_data['mime_type'] or mimetypes.guess_type(file_name)[0] or "application/octet-stream"
return Response(content=upload_data['file_data'], media_type=mime_type,
headers={"Content-Disposition": f"attachment; filename={file_name}"})
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8083)