|
from fastapi import FastAPI, File, UploadFile, HTTPException, Response |
|
import uuid |
|
import mimetypes |
|
import time |
|
import threading |
|
import schedule |
|
|
|
app = FastAPI() |
|
|
|
|
|
|
|
data_storage = {} |
|
expiration_timeout = 5 * 60 |
|
|
|
def clean_expired_data(): |
|
global data_storage |
|
current_time = time.time() |
|
data_storage = {key: value for key, value in data_storage.items() if current_time - value['uploaded_at'] < expiration_timeout} |
|
print("Data storage cleaned.") |
|
|
|
|
|
schedule.every(1).minutes.do(clean_expired_data) |
|
|
|
def run_schedule(): |
|
while True: |
|
schedule.run_pending() |
|
time.sleep(1) |
|
|
|
|
|
thread = threading.Thread(target=run_schedule) |
|
thread.daemon = True |
|
thread.start() |
|
|
|
@app.post("/upload/file/auto-link") |
|
async def upload_file_auto_link(file: UploadFile = File(...)): |
|
upload_id = str(uuid.uuid4()) |
|
file_name = file.filename |
|
mime_type = file.content_type |
|
await _store_file(upload_id, file_name, await file.read(), mime_type) |
|
return {"download_link": f"/download/auto/{upload_id}", "message": "File uploaded successfully"} |
|
|
|
@app.post("/upload/file/custom-link") |
|
async def upload_file_custom_link(link: str, file: UploadFile = File(...)): |
|
|
|
if link in [item['custom_link'] for item in data_storage.values() if 'custom_link' in item]: |
|
link += f"_{str(uuid.uuid4())[:5]}" |
|
print(f"Link collision detected. New link: {link}") |
|
|
|
upload_id = str(uuid.uuid4()) |
|
file_name = file.filename |
|
mime_type = file.content_type |
|
await _store_file(upload_id, file_name, await file.read(), mime_type, custom_link=link) |
|
return {"download_link": f"/download/custom/{link}", "message": "File uploaded successfully"} |
|
|
|
@app.get("/download/auto/{upload_id}") |
|
async def download_file_auto(upload_id: str): |
|
return await _handle_download(upload_id) |
|
|
|
@app.get("/download/custom/{link}") |
|
async def download_file_custom(link: str): |
|
for item in data_storage.values(): |
|
if 'custom_link' in item and item['custom_link'] == link: |
|
return await _handle_download(item['upload_id'], item['file_name'], item['file_data'], item['mime_type']) |
|
raise HTTPException(status_code=404, detail="File not found") |
|
|
|
async def _store_file(upload_id, file_name, file_data, mime_type, custom_link=None): |
|
global data_storage |
|
data_storage[upload_id] = { |
|
'upload_id': upload_id, |
|
'file_name': file_name, |
|
'file_data': file_data, |
|
'ime_type': mime_type, |
|
'uploaded_at': time.time(), |
|
'custom_link': custom_link |
|
} |
|
|
|
async def _handle_download(upload_id, file_name=None, file_data=None, mime_type=None): |
|
global data_storage |
|
if upload_id not in data_storage: |
|
raise HTTPException(status_code=404, detail="File not found or has expired") |
|
|
|
if not file_name or not file_data or not mime_type: |
|
upload_data = data_storage[upload_id] |
|
file_name = upload_data['file_name'] |
|
file_data = upload_data['file_data'] |
|
mime_type = upload_data['mime_type'] or mimetypes.guess_type(file_name)[0] or "application/octet-stream" |
|
|
|
return Response(content=file_data, media_type=mime_type, |
|
headers={"Content-Disposition": f"attachment; filename={file_name}"}) |
|
|
|
if __name__ == "__main__": |
|
import uvicorn |
|
uvicorn.run(app, host="0.0.0.0", port=8083) |