File size: 3,668 Bytes
cbb9517 2bb6627 884a5ab 2bb6627 3457223 2bb6627 cbb9517 2bb6627 811e0e2 cbb9517 3457223 2bb6627 24b42cd 884a5ab cbb9517 884a5ab cbb9517 884a5ab 2bb6627 cbb9517 2bb6627 cbb9517 2bb6627 cbb9517 2bb6627 cbb9517 2bb6627 cbb9517 2bb6627 cbb9517 2bb6627 85fe8e5 2bb6627 85fe8e5 2bb6627 85fe8e5 cbb9517 3457223 d259141 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
from fastapi import FastAPI, File, UploadFile, HTTPException, Response
import uuid
import mimetypes
import time
import threading
import schedule
app = FastAPI()
# **In-Memory Storage (FOR DEMO ONLY)**
# Replace with Redis, NoSQL DB, or Cloud Storage for Production
data_storage = {}
expiration_timeout = 5 * 60 # 5 minutes in seconds
def clean_expired_data():
global data_storage
current_time = time.time()
data_storage = {key: value for key, value in data_storage.items() if current_time - value['uploaded_at'] < expiration_timeout}
print("Data storage cleaned.")
# Schedule data cleaning every minute
schedule.every(1).minutes.do(clean_expired_data)
def run_schedule():
while True:
schedule.run_pending()
time.sleep(1)
# Start the scheduler in a separate thread
thread = threading.Thread(target=run_schedule)
thread.daemon = True # So that it dies when main thread dies
thread.start()
@app.post("/upload/file/auto-link")
async def upload_file_auto_link(file: UploadFile = File(...)):
upload_id = str(uuid.uuid4())
file_name = file.filename
mime_type = file.content_type
await _store_file(upload_id, file_name, await file.read(), mime_type)
return {"download_link": f"/download/auto/{upload_id}", "message": "File uploaded successfully"}
@app.post("/upload/file/custom-link")
async def upload_file_custom_link(link: str, file: UploadFile = File(...)):
# Ensure link doesn't collide and is URL-friendly (simplified for demo)
if link in [item['custom_link'] for item in data_storage.values() if 'custom_link' in item]:
link += f"_{str(uuid.uuid4())[:5]}" # Append unique suffix
print(f"Link collision detected. New link: {link}")
upload_id = str(uuid.uuid4())
file_name = file.filename
mime_type = file.content_type
await _store_file(upload_id, file_name, await file.read(), mime_type, custom_link=link)
return {"download_link": f"/download/custom/{link}", "message": "File uploaded successfully"}
@app.get("/download/auto/{upload_id}")
async def download_file_auto(upload_id: str):
return await _handle_download(upload_id)
@app.get("/download/custom/{link}")
async def download_file_custom(link: str):
for item in data_storage.values():
if 'custom_link' in item and item['custom_link'] == link:
return await _handle_download(item['upload_id'], item['file_name'], item['file_data'], item['mime_type'])
raise HTTPException(status_code=404, detail="File not found")
async def _store_file(upload_id, file_name, file_data, mime_type, custom_link=None):
global data_storage
data_storage[upload_id] = {
'upload_id': upload_id,
'file_name': file_name,
'file_data': file_data,
'ime_type': mime_type,
'uploaded_at': time.time(),
'custom_link': custom_link # Optional
}
async def _handle_download(upload_id, file_name=None, file_data=None, mime_type=None):
global data_storage
if upload_id not in data_storage:
raise HTTPException(status_code=404, detail="File not found or has expired")
if not file_name or not file_data or not mime_type:
upload_data = data_storage[upload_id]
file_name = upload_data['file_name']
file_data = upload_data['file_data']
mime_type = upload_data['mime_type'] or mimetypes.guess_type(file_name)[0] or "application/octet-stream"
return Response(content=file_data, media_type=mime_type,
headers={"Content-Disposition": f"attachment; filename={file_name}"})
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8083) |