|
from fastapi import FastAPI, File, UploadFile, HTTPException, Form, Response |
|
import uuid |
|
import mimetypes |
|
from datetime import datetime, timedelta |
|
|
|
app = FastAPI() |
|
|
|
|
|
|
|
data_storage = {} |
|
|
|
@app.post("/upload/file") |
|
async def upload_file(file: UploadFile = File(...)): |
|
upload_id = str(uuid.uuid4()) |
|
data_storage[upload_id] = { |
|
'file_name': file.filename, |
|
'file_data': await file.read(), |
|
'mime_type': file.content_type, |
|
'upload_time': datetime.utcnow() |
|
} |
|
return {"upload_id": upload_id, "message": "File uploaded successfully"} |
|
|
|
@app.post("/upload/custom") |
|
async def upload_custom(file: UploadFile = File(...), download_link: str = Form(...)): |
|
if not download_link: |
|
raise HTTPException(status_code=400, detail="Download link is required") |
|
|
|
|
|
if not download_link.isalnum() and not all(c in string.ascii_letters + string.digits + '-_' for c in download_link): |
|
raise HTTPException(status_code=400, detail="Invalid download link format") |
|
|
|
|
|
data_storage[download_link] = { |
|
'file_name': file.filename, |
|
'file_data': await file.read(), |
|
'mime_type': file.content_type, |
|
'upload_time': datetime.utcnow() |
|
} |
|
return {"download_link": download_link, "message": "File uploaded successfully"} |
|
|
|
@app.get("/download/{upload_id}") |
|
async def download_file(upload_id: str): |
|
|
|
expire_time = datetime.utcnow() - timedelta(minutes=5) |
|
keys_to_remove = [key for key, value in data_storage.items() if value['upload_time'] < expire_time] |
|
for key in keys_to_remove: |
|
del data_storage[key] |
|
|
|
if upload_id not in data_storage: |
|
raise HTTPException(status_code=404, detail="Upload not found") |
|
|
|
upload_data = data_storage[upload_id] |
|
file_name = upload_data['file_name'] |
|
mime_type = upload_data['mime_type'] or mimetypes.guess_type(file_name)[0] or "application/octet-stream" |
|
|
|
return Response(content=upload_data['file_data'], media_type=mime_type, |
|
headers={"Content-Disposition": f"attachment; filename={file_name}"}) |
|
|
|
if __name__ == "__main__": |
|
import uvicorn |
|
uvicorn.run(app, host="0.0.0.0", port=8083) |