api / app.py
paola1's picture
Update app.py
2bb6627 verified
raw
history blame
3.67 kB
from fastapi import FastAPI, File, UploadFile, HTTPException, Response
import uuid
import mimetypes
import time
import threading
import schedule
app = FastAPI()
# **In-Memory Storage (FOR DEMO ONLY)**
# Replace with Redis, NoSQL DB, or Cloud Storage for Production
data_storage = {}
expiration_timeout = 5 * 60 # 5 minutes in seconds
def clean_expired_data():
global data_storage
current_time = time.time()
data_storage = {key: value for key, value in data_storage.items() if current_time - value['uploaded_at'] < expiration_timeout}
print("Data storage cleaned.")
# Schedule data cleaning every minute
schedule.every(1).minutes.do(clean_expired_data)
def run_schedule():
while True:
schedule.run_pending()
time.sleep(1)
# Start the scheduler in a separate thread
thread = threading.Thread(target=run_schedule)
thread.daemon = True # So that it dies when main thread dies
thread.start()
@app.post("/upload/file/auto-link")
async def upload_file_auto_link(file: UploadFile = File(...)):
upload_id = str(uuid.uuid4())
file_name = file.filename
mime_type = file.content_type
await _store_file(upload_id, file_name, await file.read(), mime_type)
return {"download_link": f"/download/auto/{upload_id}", "message": "File uploaded successfully"}
@app.post("/upload/file/custom-link")
async def upload_file_custom_link(link: str, file: UploadFile = File(...)):
# Ensure link doesn't collide and is URL-friendly (simplified for demo)
if link in [item['custom_link'] for item in data_storage.values() if 'custom_link' in item]:
link += f"_{str(uuid.uuid4())[:5]}" # Append unique suffix
print(f"Link collision detected. New link: {link}")
upload_id = str(uuid.uuid4())
file_name = file.filename
mime_type = file.content_type
await _store_file(upload_id, file_name, await file.read(), mime_type, custom_link=link)
return {"download_link": f"/download/custom/{link}", "message": "File uploaded successfully"}
@app.get("/download/auto/{upload_id}")
async def download_file_auto(upload_id: str):
return await _handle_download(upload_id)
@app.get("/download/custom/{link}")
async def download_file_custom(link: str):
for item in data_storage.values():
if 'custom_link' in item and item['custom_link'] == link:
return await _handle_download(item['upload_id'], item['file_name'], item['file_data'], item['mime_type'])
raise HTTPException(status_code=404, detail="File not found")
async def _store_file(upload_id, file_name, file_data, mime_type, custom_link=None):
global data_storage
data_storage[upload_id] = {
'upload_id': upload_id,
'file_name': file_name,
'file_data': file_data,
'ime_type': mime_type,
'uploaded_at': time.time(),
'custom_link': custom_link # Optional
}
async def _handle_download(upload_id, file_name=None, file_data=None, mime_type=None):
global data_storage
if upload_id not in data_storage:
raise HTTPException(status_code=404, detail="File not found or has expired")
if not file_name or not file_data or not mime_type:
upload_data = data_storage[upload_id]
file_name = upload_data['file_name']
file_data = upload_data['file_data']
mime_type = upload_data['mime_type'] or mimetypes.guess_type(file_name)[0] or "application/octet-stream"
return Response(content=file_data, media_type=mime_type,
headers={"Content-Disposition": f"attachment; filename={file_name}"})
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8083)