|
from fastapi import FastAPI, File, UploadFile, HTTPException, Response |
|
from pydantic import BaseModel |
|
from typing import Optional |
|
import uuid |
|
import os |
|
import schedule |
|
import time |
|
import threading |
|
|
|
app = FastAPI() |
|
|
|
|
|
|
|
data_storage = {} |
|
expiration_timeout = 15 * 60 |
|
|
|
def clean_expired_data(): |
|
global data_storage |
|
current_time = time.time() |
|
data_storage = {key: value for key, value in data_storage.items() if current_time - value['uploaded_at'] < expiration_timeout} |
|
print("Data storage cleaned.") |
|
|
|
|
|
schedule.every(15).minutes.do(clean_expired_data) |
|
|
|
def run_schedule(): |
|
while True: |
|
schedule.run_pending() |
|
time.sleep(1) |
|
|
|
|
|
thread = threading.Thread(target=run_schedule) |
|
thread.daemon = True |
|
thread.start() |
|
|
|
class TextUpload(BaseModel): |
|
text: str |
|
file_name: Optional[str] = "uploaded_text.txt" |
|
|
|
@app.post("/upload/file") |
|
async def upload_file(file: UploadFile = File(...)): |
|
global data_storage |
|
upload_id = str(uuid.uuid4()) |
|
data_storage[upload_id] = { |
|
'data_type': "file", |
|
'file_name': file.filename, |
|
'file_data': await file.read(), |
|
'uploaded_at': time.time() |
|
} |
|
|
|
|
|
|
|
return {"upload_id": upload_id, "message": "File uploaded successfully"} |
|
|
|
@app.post("/upload/text") |
|
async def upload_text(text_upload: TextUpload): |
|
global data_storage |
|
upload_id = str(uuid.uuid4()) |
|
data_storage[upload_id] = { |
|
'data_type': "text", |
|
'file_name': text_upload.file_name, |
|
'file_data': text_upload.text.encode('utf-8'), |
|
'uploaded_at': time.time() |
|
} |
|
return {"upload_id": upload_id, "message": "Text uploaded successfully"} |
|
|
|
@app.get("/download/{upload_id}") |
|
async def download_file(upload_id: str): |
|
global data_storage |
|
if upload_id not in data_storage: |
|
raise HTTPException(status_code=404, detail="Upload not found or has expired") |
|
|
|
upload_data = data_storage[upload_id] |
|
if upload_data['data_type'] == "file": |
|
return Response(content=upload_data['file_data'], media_type="application/octet-stream", headers={"Content-Disposition": f"attachment; filename={upload_data['file_name']}"}) |
|
elif upload_data['data_type'] == "text": |
|
return Response(content=upload_data['file_data'], media_type="text/plain", headers={"Content-Disposition": f"attachment; filename={upload_data['file_name']}"}) |
|
|
|
if __name__ == "__main__": |
|
scheduled_function() |
|
import uvicorn |
|
uvicorn.run(app, host="0.0.0.0", port=8083) |