File size: 2,883 Bytes
cbb9517 3457223 cbb9517 884a5ab 3457223 cbb9517 811e0e2 cbb9517 3457223 cbb9517 24b42cd 884a5ab cbb9517 884a5ab cbb9517 884a5ab cbb9517 3457223 3fee684 3457223 d259141 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
from fastapi import FastAPI, File, UploadFile, HTTPException, Response
from pydantic import BaseModel
from typing import Optional
import uuid
import os
import schedule
import time
import threading
app = FastAPI()
# In-memory storage (NOT SUITABLE FOR PRODUCTION)
# Consider Redis or an embedded NoSQL DB for production
data_storage = {}
expiration_timeout = 15 * 60 # 15 minutes in seconds
def clean_expired_data():
global data_storage
current_time = time.time()
data_storage = {key: value for key, value in data_storage.items() if current_time - value['uploaded_at'] < expiration_timeout}
print("Data storage cleaned.")
# Schedule data cleaning every 15 minutes
schedule.every(15).minutes.do(clean_expired_data)
def run_schedule():
while True:
schedule.run_pending()
time.sleep(1)
# Start the scheduler in a separate thread
thread = threading.Thread(target=run_schedule)
thread.daemon = True # So that it dies when main thread dies
thread.start()
class TextUpload(BaseModel):
text: str
file_name: Optional[str] = "uploaded_text.txt"
@app.post("/upload/file")
async def upload_file(file: UploadFile = File(...)):
global data_storage
upload_id = str(uuid.uuid4())
data_storage[upload_id] = {
'data_type': "file",
'file_name': file.filename,
'file_data': await file.read(),
'uploaded_at': time.time()
}
# Optionally save file to filesystem (commented out for brevity)
# with open(f"uploads/{file.filename}", "wb") as f:
# f.write(await file.read())
return {"upload_id": upload_id, "message": "File uploaded successfully"}
@app.post("/upload/text")
async def upload_text(text_upload: TextUpload):
global data_storage
upload_id = str(uuid.uuid4())
data_storage[upload_id] = {
'data_type': "text",
'file_name': text_upload.file_name,
'file_data': text_upload.text.encode('utf-8'),
'uploaded_at': time.time()
}
return {"upload_id": upload_id, "message": "Text uploaded successfully"}
@app.get("/download/{upload_id}")
async def download_file(upload_id: str):
global data_storage
if upload_id not in data_storage:
raise HTTPException(status_code=404, detail="Upload not found or has expired")
upload_data = data_storage[upload_id]
if upload_data['data_type'] == "file":
return Response(content=upload_data['file_data'], media_type="application/octet-stream", headers={"Content-Disposition": f"attachment; filename={upload_data['file_name']}"})
elif upload_data['data_type'] == "text":
return Response(content=upload_data['file_data'], media_type="text/plain", headers={"Content-Disposition": f"attachment; filename={upload_data['file_name']}"})
if __name__ == "__main__":
scheduled_function()
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8083) |