paola1 commited on
Commit
f19d5db
·
verified ·
1 Parent(s): 031629f

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +149 -55
app.py CHANGED
@@ -1,60 +1,154 @@
1
- from fastapi import FastAPI, File, UploadFile, HTTPException, Form, Response
2
- import uuid
3
- import mimetypes
 
4
  from datetime import datetime, timedelta
 
 
5
 
6
- app = FastAPI()
7
-
8
- # **In-Memory Storage (FOR DEMO ONLY)**
9
- # Replace with Redis, NoSQL DB, or Cloud Storage for Production
10
- data_storage = {}
11
-
12
- @app.post("/upload/file")
13
- async def upload_file(file: UploadFile = File(...)):
14
- upload_id = str(uuid.uuid4())
15
- data_storage[upload_id] = {
16
- 'file_name': file.filename,
17
- 'file_data': await file.read(),
18
- 'mime_type': file.content_type,
19
- 'upload_time': datetime.utcnow()
20
- }
21
- return {"upload_id": upload_id, "message": "File uploaded successfully"}
22
-
23
- @app.post("/upload/custom")
24
- async def upload_custom(file: UploadFile = File(...), download_link: str = Form(...)):
25
- if not download_link:
26
- raise HTTPException(status_code=400, detail="Download link is required")
27
-
28
- # Validate the download_link to be URL-safe
29
- if not download_link.isalnum() and not all(c in string.ascii_letters + string.digits + '-_' for c in download_link):
30
- raise HTTPException(status_code=400, detail="Invalid download link format")
31
-
32
- # Assign the custom download link
33
- data_storage[download_link] = {
34
- 'file_name': file.filename,
35
- 'file_data': await file.read(),
36
- 'mime_type': file.content_type,
37
- 'upload_time': datetime.utcnow()
38
- }
39
- return {"download_link": download_link, "message": "File uploaded successfully"}
40
-
41
- @app.get("/download/{upload_id}")
42
- async def download_file(upload_id: str):
43
- # Remove expired files
44
- expire_time = datetime.utcnow() - timedelta(minutes=5)
45
- keys_to_remove = [key for key, value in data_storage.items() if value['upload_time'] < expire_time]
46
- for key in keys_to_remove:
47
- del data_storage[key]
48
-
49
- if upload_id not in data_storage:
50
- raise HTTPException(status_code=404, detail="Upload not found")
51
-
52
- upload_data = data_storage[upload_id]
53
- file_name = upload_data['file_name']
54
- mime_type = upload_data['mime_type'] or mimetypes.guess_type(file_name)[0] or "application/octet-stream"
55
-
56
- return Response(content=upload_data['file_data'], media_type=mime_type,
57
- headers={"Content-Disposition": f"attachment; filename={file_name}"})
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
58
 
59
  if __name__ == "__main__":
60
  import uvicorn
 
1
+ from fastapi import FastAPI, HTTPException, Request, BackgroundTasks
2
+ from fastapi.responses import JSONResponse
3
+ from pydantic import BaseModel, Field, validator
4
+ from typing import Optional, List, Dict
5
  from datetime import datetime, timedelta
6
+ import json
7
+ import asyncio
8
 
9
+ app = FastAPI(title="User Tracking API")
10
+
11
+ class UserEntry(BaseModel):
12
+ ip_address: str
13
+ device_type: str
14
+ timestamp: Optional[datetime] = Field(default_factory=datetime.utcnow)
15
+
16
+ @validator('ip_address')
17
+ def validate_ip(cls, v):
18
+ # Simple IP address validation
19
+ parts = v.split('.')
20
+ if len(parts) != 4 or not all(part.isdigit() and 0 <= int(part) <= 255 for part in parts):
21
+ raise ValueError('Invalid IP address format')
22
+ return v
23
+
24
+ class DataResponse(BaseModel):
25
+ total_unique_users: int
26
+ device_info: Dict[str, int]
27
+
28
+ class ImportData(BaseModel):
29
+ data: List[Dict]
30
+
31
+ class ExportData(BaseModel):
32
+ data: List[Dict]
33
+
34
+ # In-memory data storage
35
+ data_store: List[Dict] = []
36
+ data_lock = asyncio.Lock()
37
+
38
+ async def add_user_entry(entry: Dict):
39
+ async with data_lock:
40
+ # Remove existing entry with same IP
41
+ global data_store
42
+ data_store = [e for e in data_store if e['ip_address'] != entry['ip_address']]
43
+ data_store.append(entry)
44
+
45
+ async def get_filtered_data(time_filter: str):
46
+ now = datetime.utcnow()
47
+ if time_filter == "last_hour":
48
+ cutoff = now - timedelta(hours=1)
49
+ elif time_filter == "last_day":
50
+ cutoff = now - timedelta(days=1)
51
+ elif time_filter == "last_7_days":
52
+ cutoff = now - timedelta(days=7)
53
+ else:
54
+ raise ValueError("Invalid time filter. Choose from 'last_hour', 'last_day', 'last_7_days'.")
55
+
56
+ async with data_lock:
57
+ filtered = [e for e in data_store if datetime.fromisoformat(e['timestamp']) >= cutoff]
58
+ return filtered
59
+
60
+ async def export_data_to_json():
61
+ async with data_lock:
62
+ return json.dumps(data_store, indent=4)
63
+
64
+ async def import_data_from_json(json_data: List[Dict]):
65
+ async with data_lock:
66
+ global data_store
67
+ # Validate and add entries
68
+ for entry in json_data:
69
+ # Basic validation
70
+ if 'ip_address' not in entry or 'device_type' not in entry or 'timestamp' not in entry:
71
+ raise ValueError("Invalid data format")
72
+ # Optionally, add more validation here
73
+ data_store = json_data
74
+
75
+ @app.post("/user_entry", status_code=201)
76
+ async def user_entry(entry: UserEntry):
77
+ try:
78
+ entry_dict = entry.dict()
79
+ await add_user_entry(entry_dict)
80
+ return {"message": "User entry added successfully"}
81
+ except ValueError as ve:
82
+ raise HTTPException(status_code=400, detail=str(ve))
83
+ except Exception as e:
84
+ raise HTTPException(status_code=500, detail="Internal Server Error")
85
+
86
+ @app.get("/user_data", response_model=DataResponse)
87
+ async def get_user_data(time_filter: str = "last_day"):
88
+ try:
89
+ filtered = await get_filtered_data(time_filter)
90
+ unique_users = {entry['ip_address'] for entry in filtered}
91
+ device_info = {}
92
+ for entry in filtered:
93
+ device = entry['device_type']
94
+ device_info[device] = device_info.get(device, 0) + 1
95
+ return {
96
+ "total_unique_users": len(unique_users),
97
+ "device_info": device_info
98
+ }
99
+ except ValueError as ve:
100
+ raise HTTPException(status_code=400, detail=str(ve))
101
+ except Exception as e:
102
+ raise HTTPException(status_code=500, detail="Internal Server Error")
103
+
104
+ @app.get("/export_data", response_model=ExportData)
105
+ async def export_data():
106
+ try:
107
+ data_json = await export_data_to_json()
108
+ data = json.loads(data_json)
109
+ return {"data": data}
110
+ except Exception as e:
111
+ raise HTTPException(status_code=500, detail="Failed to export data")
112
+
113
+ @app.post("/import_data", status_code=200)
114
+ async def import_data(import_payload: ImportData):
115
+ try:
116
+ await import_data_from_json(import_payload.data)
117
+ return {"message": "Data imported successfully"}
118
+ except ValueError as ve:
119
+ raise HTTPException(status_code=400, detail=str(ve))
120
+ except Exception as e:
121
+ raise HTTPException(status_code=500, detail="Failed to import data")
122
+
123
+ async def cleanup_old_data():
124
+ while True:
125
+ await asyncio.sleep(3600) # Run every hour
126
+ cutoff = datetime.utcnow() - timedelta(days=7)
127
+ async with data_lock:
128
+ global data_store
129
+ original_length = len(data_store)
130
+ data_store = [e for e in data_store if datetime.fromisoformat(e['timestamp']) >= cutoff]
131
+ cleaned = original_length - len(data_store)
132
+ if cleaned > 0:
133
+ print(f"Cleaned up {cleaned} old entries.")
134
+
135
+ @app.on_event("startup")
136
+ async def startup_event():
137
+ asyncio.create_task(cleanup_old_data())
138
+
139
+ @app.exception_handler(HTTPException)
140
+ async def http_exception_handler(request: Request, exc: HTTPException):
141
+ return JSONResponse(
142
+ status_code=exc.status_code,
143
+ content={"error": exc.detail},
144
+ )
145
+
146
+ @app.exception_handler(Exception)
147
+ async def unhandled_exception_handler(request: Request, exc: Exception):
148
+ return JSONResponse(
149
+ status_code=500,
150
+ content={"error": "An unexpected error occurred."},
151
+ )
152
 
153
  if __name__ == "__main__":
154
  import uvicorn