paola1 commited on
Commit
410351d
·
verified ·
1 Parent(s): 2bb6627

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +40 -72
app.py CHANGED
@@ -1,91 +1,59 @@
1
- from fastapi import FastAPI, File, UploadFile, HTTPException, Response
2
  import uuid
3
  import mimetypes
4
- import time
5
- import threading
6
- import schedule
7
 
8
  app = FastAPI()
9
 
10
  # **In-Memory Storage (FOR DEMO ONLY)**
11
  # Replace with Redis, NoSQL DB, or Cloud Storage for Production
12
  data_storage = {}
13
- expiration_timeout = 5 * 60 # 5 minutes in seconds
14
 
15
- def clean_expired_data():
16
- global data_storage
17
- current_time = time.time()
18
- data_storage = {key: value for key, value in data_storage.items() if current_time - value['uploaded_at'] < expiration_timeout}
19
- print("Data storage cleaned.")
20
-
21
- # Schedule data cleaning every minute
22
- schedule.every(1).minutes.do(clean_expired_data)
23
-
24
- def run_schedule():
25
- while True:
26
- schedule.run_pending()
27
- time.sleep(1)
28
-
29
- # Start the scheduler in a separate thread
30
- thread = threading.Thread(target=run_schedule)
31
- thread.daemon = True # So that it dies when main thread dies
32
- thread.start()
33
-
34
- @app.post("/upload/file/auto-link")
35
- async def upload_file_auto_link(file: UploadFile = File(...)):
36
- upload_id = str(uuid.uuid4())
37
- file_name = file.filename
38
- mime_type = file.content_type
39
- await _store_file(upload_id, file_name, await file.read(), mime_type)
40
- return {"download_link": f"/download/auto/{upload_id}", "message": "File uploaded successfully"}
41
-
42
- @app.post("/upload/file/custom-link")
43
- async def upload_file_custom_link(link: str, file: UploadFile = File(...)):
44
- # Ensure link doesn't collide and is URL-friendly (simplified for demo)
45
- if link in [item['custom_link'] for item in data_storage.values() if 'custom_link' in item]:
46
- link += f"_{str(uuid.uuid4())[:5]}" # Append unique suffix
47
- print(f"Link collision detected. New link: {link}")
48
-
49
  upload_id = str(uuid.uuid4())
50
- file_name = file.filename
51
- mime_type = file.content_type
52
- await _store_file(upload_id, file_name, await file.read(), mime_type, custom_link=link)
53
- return {"download_link": f"/download/custom/{link}", "message": "File uploaded successfully"}
54
-
55
- @app.get("/download/auto/{upload_id}")
56
- async def download_file_auto(upload_id: str):
57
- return await _handle_download(upload_id)
58
-
59
- @app.get("/download/custom/{link}")
60
- async def download_file_custom(link: str):
61
- for item in data_storage.values():
62
- if 'custom_link' in item and item['custom_link'] == link:
63
- return await _handle_download(item['upload_id'], item['file_name'], item['file_data'], item['mime_type'])
64
- raise HTTPException(status_code=404, detail="File not found")
65
-
66
- async def _store_file(upload_id, file_name, file_data, mime_type, custom_link=None):
67
- global data_storage
68
  data_storage[upload_id] = {
69
- 'upload_id': upload_id,
70
- 'file_name': file_name,
71
- 'file_data': file_data,
72
- 'ime_type': mime_type,
73
- 'uploaded_at': time.time(),
74
- 'custom_link': custom_link # Optional
75
  }
 
76
 
77
- async def _handle_download(upload_id, file_name=None, file_data=None, mime_type=None):
78
- global data_storage
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
79
  if upload_id not in data_storage:
80
- raise HTTPException(status_code=404, detail="File not found or has expired")
81
 
82
- if not file_name or not file_data or not mime_type:
83
- upload_data = data_storage[upload_id]
84
- file_name = upload_data['file_name']
85
- file_data = upload_data['file_data']
86
- mime_type = upload_data['mime_type'] or mimetypes.guess_type(file_name)[0] or "application/octet-stream"
87
 
88
- return Response(content=file_data, media_type=mime_type,
89
  headers={"Content-Disposition": f"attachment; filename={file_name}"})
90
 
91
  if __name__ == "__main__":
 
1
+ from fastapi import FastAPI, File, UploadFile, HTTPException, Form
2
  import uuid
3
  import mimetypes
4
+ from datetime import datetime, timedelta
 
 
5
 
6
  app = FastAPI()
7
 
8
  # **In-Memory Storage (FOR DEMO ONLY)**
9
  # Replace with Redis, NoSQL DB, or Cloud Storage for Production
10
  data_storage = {}
 
11
 
12
+ @app.post("/upload/file")
13
+ async def upload_file(file: UploadFile = File(...)):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
14
  upload_id = str(uuid.uuid4())
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
15
  data_storage[upload_id] = {
16
+ 'file_name': file.filename,
17
+ 'file_data': await file.read(),
18
+ 'mime_type': file.content_type,
19
+ 'upload_time': datetime.datetime.utcnow()
 
 
20
  }
21
+ return {"upload_id": upload_id, "message": "File uploaded successfully"}
22
 
23
+ @app.post("/upload/custom")
24
+ async def upload_custom(file: UploadFile = File(...), download_link: str = Form(...)):
25
+ if not download_link:
26
+ raise HTTPException(status_code=400, detail="Download link is required")
27
+
28
+ # Validate the download_link to be URL-safe
29
+ if not download_link.isalnum() and not all(c in string.ascii_letters + string.digits + '-_' for c in download_link):
30
+ raise HTTPException(status_code=400, detail="Invalid download link format")
31
+
32
+ # Assign the custom download link
33
+ data_storage[download_link] = {
34
+ 'file_name': file.filename,
35
+ 'file_data': await file.read(),
36
+ 'mime_type': file.content_type,
37
+ 'upload_time': datetime.datetime.utcnow()
38
+ }
39
+ return {"download_link": download_link, "message": "File uploaded successfully"}
40
+
41
+ @app.get("/download/{upload_id}")
42
+ async def download_file(upload_id: str):
43
+ # Remove expired files
44
+ expire_time = datetime.datetime.utcnow() - timedelta(minutes=5)
45
+ keys_to_remove = [key for key, value in data_storage.items() if value['upload_time'] < expire_time]
46
+ for key in keys_to_remove:
47
+ del data_storage[key]
48
+
49
  if upload_id not in data_storage:
50
+ raise HTTPException(status_code=404, detail="Upload not found")
51
 
52
+ upload_data = data_storage[upload_id]
53
+ file_name = upload_data['file_name']
54
+ mime_type = upload_data['mime_type'] or mimetypes.guess_type(file_name)[0] or "application/octet-stream"
 
 
55
 
56
+ return Response(content=upload_data['file_data'], media_type=mime_type,
57
  headers={"Content-Disposition": f"attachment; filename={file_name}"})
58
 
59
  if __name__ == "__main__":