File size: 3,668 Bytes
cbb9517
 
2bb6627
884a5ab
 
2bb6627
3457223
 
 
2bb6627
 
cbb9517
2bb6627
811e0e2
cbb9517
 
 
 
 
3457223
2bb6627
 
24b42cd
884a5ab
 
 
 
 
cbb9517
884a5ab
cbb9517
884a5ab
 
2bb6627
 
 
 
 
 
 
cbb9517
2bb6627
 
 
 
 
 
 
cbb9517
2bb6627
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cbb9517
2bb6627
cbb9517
 
2bb6627
 
 
 
 
 
cbb9517
 
2bb6627
cbb9517
 
2bb6627
85fe8e5
2bb6627
 
 
 
 
85fe8e5
2bb6627
85fe8e5
cbb9517
3457223
 
d259141
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
from fastapi import FastAPI, File, UploadFile, HTTPException, Response
import uuid
import mimetypes
import time
import threading
import schedule

app = FastAPI()

# **In-Memory Storage (FOR DEMO ONLY)**
# Replace with Redis, NoSQL DB, or Cloud Storage for Production
data_storage = {}
expiration_timeout = 5 * 60  # 5 minutes in seconds

def clean_expired_data():
    global data_storage
    current_time = time.time()
    data_storage = {key: value for key, value in data_storage.items() if current_time - value['uploaded_at'] < expiration_timeout}
    print("Data storage cleaned.")

# Schedule data cleaning every minute
schedule.every(1).minutes.do(clean_expired_data) 

def run_schedule():
    while True:
        schedule.run_pending()
        time.sleep(1)

# Start the scheduler in a separate thread
thread = threading.Thread(target=run_schedule)
thread.daemon = True  # So that it dies when main thread dies
thread.start()

@app.post("/upload/file/auto-link")
async def upload_file_auto_link(file: UploadFile = File(...)):
    upload_id = str(uuid.uuid4())
    file_name = file.filename
    mime_type = file.content_type
    await _store_file(upload_id, file_name, await file.read(), mime_type)
    return {"download_link": f"/download/auto/{upload_id}", "message": "File uploaded successfully"}

@app.post("/upload/file/custom-link")
async def upload_file_custom_link(link: str, file: UploadFile = File(...)):
    # Ensure link doesn't collide and is URL-friendly (simplified for demo)
    if link in [item['custom_link'] for item in data_storage.values() if 'custom_link' in item]:
        link += f"_{str(uuid.uuid4())[:5]}"  # Append unique suffix
        print(f"Link collision detected. New link: {link}")
    
    upload_id = str(uuid.uuid4())
    file_name = file.filename
    mime_type = file.content_type
    await _store_file(upload_id, file_name, await file.read(), mime_type, custom_link=link)
    return {"download_link": f"/download/custom/{link}", "message": "File uploaded successfully"}

@app.get("/download/auto/{upload_id}")
async def download_file_auto(upload_id: str):
    return await _handle_download(upload_id)

@app.get("/download/custom/{link}")
async def download_file_custom(link: str):
    for item in data_storage.values():
        if 'custom_link' in item and item['custom_link'] == link:
            return await _handle_download(item['upload_id'], item['file_name'], item['file_data'], item['mime_type'])
    raise HTTPException(status_code=404, detail="File not found")

async def _store_file(upload_id, file_name, file_data, mime_type, custom_link=None):
    global data_storage
    data_storage[upload_id] = {
        'upload_id': upload_id,
        'file_name': file_name,
        'file_data': file_data,
        'ime_type': mime_type,
        'uploaded_at': time.time(),
        'custom_link': custom_link  # Optional
    }

async def _handle_download(upload_id, file_name=None, file_data=None, mime_type=None):
    global data_storage
    if upload_id not in data_storage:
        raise HTTPException(status_code=404, detail="File not found or has expired")
    
    if not file_name or not file_data or not mime_type:
        upload_data = data_storage[upload_id]
        file_name = upload_data['file_name']
        file_data = upload_data['file_data']
        mime_type = upload_data['mime_type'] or mimetypes.guess_type(file_name)[0] or "application/octet-stream"
    
    return Response(content=file_data, media_type=mime_type, 
                    headers={"Content-Disposition": f"attachment; filename={file_name}"})

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8083)