from fastapi import FastAPI, File, UploadFile, HTTPException, Response from pydantic import BaseModel from typing import Optional import uuid import os import schedule import time import threading app = FastAPI() # In-memory storage (NOT SUITABLE FOR PRODUCTION) # Consider Redis or an embedded NoSQL DB for production data_storage = {} expiration_timeout = 15 * 60 # 15 minutes in seconds def clean_expired_data(): global data_storage current_time = time.time() data_storage = {key: value for key, value in data_storage.items() if current_time - value['uploaded_at'] < expiration_timeout} print("Data storage cleaned.") # Schedule data cleaning every 15 minutes schedule.every(15).minutes.do(clean_expired_data) def run_schedule(): while True: schedule.run_pending() time.sleep(1) # Start the scheduler in a separate thread thread = threading.Thread(target=run_schedule) thread.daemon = True # So that it dies when main thread dies thread.start() class TextUpload(BaseModel): text: str file_name: Optional[str] = "uploaded_text.txt" @app.post("/upload/file") async def upload_file(file: UploadFile = File(...)): global data_storage upload_id = str(uuid.uuid4()) data_storage[upload_id] = { 'data_type': "file", 'file_name': file.filename, 'file_data': await file.read(), 'uploaded_at': time.time() } # Optionally save file to filesystem (commented out for brevity) # with open(f"uploads/{file.filename}", "wb") as f: # f.write(await file.read()) return {"upload_id": upload_id, "message": "File uploaded successfully"} @app.post("/upload/text") async def upload_text(text_upload: TextUpload): global data_storage upload_id = str(uuid.uuid4()) data_storage[upload_id] = { 'data_type': "text", 'file_name': text_upload.file_name, 'file_data': text_upload.text.encode('utf-8'), 'uploaded_at': time.time() } return {"upload_id": upload_id, "message": "Text uploaded successfully"} @app.get("/download/{upload_id}") async def download_file(upload_id: str): global data_storage if upload_id not in data_storage: raise HTTPException(status_code=404, detail="Upload not found or has expired") upload_data = data_storage[upload_id] if upload_data['data_type'] == "file": return Response(content=upload_data['file_data'], media_type="application/octet-stream", headers={"Content-Disposition": f"attachment; filename={upload_data['file_name']}"}) elif upload_data['data_type'] == "text": return Response(content=upload_data['file_data'], media_type="text/plain", headers={"Content-Disposition": f"attachment; filename={upload_data['file_name']}"}) if __name__ == "__main__": scheduled_function() import uvicorn uvicorn.run(app, host="0.0.0.0", port=8083)