from fastapi import FastAPI, File, UploadFile, HTTPException, Response import uuid import mimetypes import time import threading import schedule app = FastAPI() # **In-Memory Storage (FOR DEMO ONLY)** # Replace with Redis, NoSQL DB, or Cloud Storage for Production data_storage = {} expiration_timeout = 5 * 60 # 5 minutes in seconds def clean_expired_data(): global data_storage current_time = time.time() data_storage = {key: value for key, value in data_storage.items() if current_time - value['uploaded_at'] < expiration_timeout} print("Data storage cleaned.") # Schedule data cleaning every minute schedule.every(1).minutes.do(clean_expired_data) def run_schedule(): while True: schedule.run_pending() time.sleep(1) # Start the scheduler in a separate thread thread = threading.Thread(target=run_schedule) thread.daemon = True # So that it dies when main thread dies thread.start() @app.post("/upload/file/auto-link") async def upload_file_auto_link(file: UploadFile = File(...)): upload_id = str(uuid.uuid4()) file_name = file.filename mime_type = file.content_type await _store_file(upload_id, file_name, await file.read(), mime_type) return {"download_link": f"/download/auto/{upload_id}", "message": "File uploaded successfully"} @app.post("/upload/file/custom-link") async def upload_file_custom_link(link: str, file: UploadFile = File(...)): # Ensure link doesn't collide and is URL-friendly (simplified for demo) if link in [item['custom_link'] for item in data_storage.values() if 'custom_link' in item]: link += f"_{str(uuid.uuid4())[:5]}" # Append unique suffix print(f"Link collision detected. New link: {link}") upload_id = str(uuid.uuid4()) file_name = file.filename mime_type = file.content_type await _store_file(upload_id, file_name, await file.read(), mime_type, custom_link=link) return {"download_link": f"/download/custom/{link}", "message": "File uploaded successfully"} @app.get("/download/auto/{upload_id}") async def download_file_auto(upload_id: str): return await _handle_download(upload_id) @app.get("/download/custom/{link}") async def download_file_custom(link: str): for item in data_storage.values(): if 'custom_link' in item and item['custom_link'] == link: return await _handle_download(item['upload_id'], item['file_name'], item['file_data'], item['mime_type']) raise HTTPException(status_code=404, detail="File not found") async def _store_file(upload_id, file_name, file_data, mime_type, custom_link=None): global data_storage data_storage[upload_id] = { 'upload_id': upload_id, 'file_name': file_name, 'file_data': file_data, 'ime_type': mime_type, 'uploaded_at': time.time(), 'custom_link': custom_link # Optional } async def _handle_download(upload_id, file_name=None, file_data=None, mime_type=None): global data_storage if upload_id not in data_storage: raise HTTPException(status_code=404, detail="File not found or has expired") if not file_name or not file_data or not mime_type: upload_data = data_storage[upload_id] file_name = upload_data['file_name'] file_data = upload_data['file_data'] mime_type = upload_data['mime_type'] or mimetypes.guess_type(file_name)[0] or "application/octet-stream" return Response(content=file_data, media_type=mime_type, headers={"Content-Disposition": f"attachment; filename={file_name}"}) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8083)