Spaces:
paola1
/
Sleeping

paola1 commited on
Commit
a5f1617
·
verified ·
1 Parent(s): a3069a0

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +152 -113
app.py CHANGED
@@ -1,153 +1,192 @@
1
- from fastapi import FastAPI, HTTPException, Request, BackgroundTasks
2
- from fastapi.responses import JSONResponse
3
- from pydantic import BaseModel, Field, validator
4
- from typing import Optional, List, Dict
5
  from datetime import datetime, timedelta
 
 
6
  import json
7
- import asyncio
 
 
 
 
 
8
 
9
- app = FastAPI(title="User Tracking API")
 
10
 
 
11
  class UserEntry(BaseModel):
12
  ip_address: str
13
  device_type: str
14
- timestamp: Optional[datetime] = Field(default_factory=datetime.utcnow)
15
-
16
- @validator('ip_address')
17
- def validate_ip(cls, v):
18
- # Simple IP address validation
19
- parts = v.split('.')
20
- if len(parts) != 4 or not all(part.isdigit() and 0 <= int(part) <= 255 for part in parts):
21
- raise ValueError('Invalid IP address format')
22
- return v
23
-
24
- class DataResponse(BaseModel):
25
- total_unique_users: int
26
- device_info: Dict[str, int]
27
-
28
- class ImportData(BaseModel):
29
- data: List[Dict]
30
-
31
- class ExportData(BaseModel):
32
- data: List[Dict]
33
-
34
- # In-memory data storage
35
- data_store: List[Dict] = []
36
- data_lock = asyncio.Lock()
37
-
38
- async def add_user_entry(entry: Dict):
39
- async with data_lock:
40
- # Remove existing entry with same IP
41
- global data_store
42
- data_store = [e for e in data_store if e['ip_address'] != entry['ip_address']]
43
- data_store.append(entry)
44
-
45
- async def get_filtered_data(time_filter: str):
46
- now = datetime.utcnow()
47
- if time_filter == "last_hour":
48
- cutoff = now - timedelta(hours=1)
49
- elif time_filter == "last_day":
50
- cutoff = now - timedelta(days=1)
51
- elif time_filter == "last_7_days":
52
- cutoff = now - timedelta(days=7)
53
- else:
54
- raise ValueError("Invalid time filter. Choose from 'last_hour', 'last_day', 'last_7_days'.")
55
-
56
- async with data_lock:
57
- filtered = [e for e in data_store if datetime.fromisoformat(e['timestamp']) >= cutoff]
58
- return filtered
59
-
60
- async def export_data_to_json():
61
- async with data_lock:
62
- return json.dumps(data_store, indent=4)
63
-
64
- async def import_data_from_json(json_data: List[Dict]):
65
- async with data_lock:
66
- global data_store
67
- # Validate and add entries
68
- for entry in json_data:
69
- # Basic validation
70
- if 'ip_address' not in entry or 'device_type' not in entry or 'timestamp' not in entry:
71
- raise ValueError("Invalid data format")
72
- # Optionally, add more validation here
73
- data_store = json_data
74
-
75
- @app.post("/user_entry", status_code=201)
76
- async def user_entry(entry: UserEntry):
77
  try:
78
- entry_dict = entry.dict()
79
- await add_user_entry(entry_dict)
80
- return {"message": "User entry added successfully"}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
81
  except ValueError as ve:
82
  raise HTTPException(status_code=400, detail=str(ve))
83
  except Exception as e:
84
- raise HTTPException(status_code=500, detail="Internal Server Error")
85
 
86
- @app.get("/user_data", response_model=DataResponse)
87
- async def get_user_data(time_filter: str = "last_day"):
 
88
  try:
89
- filtered = await get_filtered_data(time_filter)
90
- unique_users = {entry['ip_address'] for entry in filtered}
91
- device_info = {}
92
- for entry in filtered:
93
- device = entry['device_type']
94
- device_info[device] = device_info.get(device, 0) + 1
 
 
 
 
 
 
 
 
 
 
 
 
95
  return {
96
- "total_unique_users": len(unique_users),
97
- "device_info": device_info
98
  }
99
- except ValueError as ve:
100
- raise HTTPException(status_code=400, detail=str(ve))
101
  except Exception as e:
102
- raise HTTPException(status_code=500, detail="Internal Server Error")
103
 
104
- @app.get("/export_data", response_model=ExportData)
105
- async def export_data():
 
106
  try:
107
- data_json = await export_data_to_json()
108
- data = json.loads(data_json)
109
- return {"data": data}
110
  except Exception as e:
111
- raise HTTPException(status_code=500, detail="Failed to export data")
112
 
113
- @app.post("/import_data", status_code=200)
114
- async def import_data(import_payload: ImportData):
 
115
  try:
116
- await import_data_from_json(import_payload.data)
117
- return {"message": "Data imported successfully"}
118
- except ValueError as ve:
119
- raise HTTPException(status_code=400, detail=str(ve))
 
 
 
 
 
 
 
 
 
 
 
120
  except Exception as e:
121
- raise HTTPException(status_code=500, detail="Failed to import data")
122
 
123
- async def cleanup_old_data():
 
 
124
  while True:
125
- await asyncio.sleep(3600) # Run every hour
126
- cutoff = datetime.utcnow() - timedelta(days=7)
127
- async with data_lock:
128
- global data_store
129
- original_length = len(data_store)
130
- data_store = [e for e in data_store if datetime.fromisoformat(e['timestamp']) >= cutoff]
131
- cleaned = original_length - len(data_store)
132
- if cleaned > 0:
133
- print(f"Cleaned up {cleaned} old entries.")
134
 
135
  @app.on_event("startup")
136
  async def startup_event():
137
- asyncio.create_task(cleanup_old_data())
 
 
 
 
 
 
138
 
139
  @app.exception_handler(HTTPException)
140
  async def http_exception_handler(request: Request, exc: HTTPException):
141
  return JSONResponse(
142
  status_code=exc.status_code,
143
- content={"error": exc.detail},
144
  )
145
 
146
  @app.exception_handler(Exception)
147
- async def unhandled_exception_handler(request: Request, exc: Exception):
148
  return JSONResponse(
149
  status_code=500,
150
- content={"error": "An unexpected error occurred."},
151
  )
152
 
153
  if __name__ == "__main__":
 
1
+ from typing import List, Dict, Optional
 
 
 
2
  from datetime import datetime, timedelta
3
+ from fastapi import FastAPI, HTTPException, Query, Body
4
+ from pydantic import BaseModel, validator
5
  import json
6
+ import os
7
+
8
+ app = FastAPI()
9
+
10
+ # Data storage (in-memory)
11
+ user_data: Dict[str, dict] = {} # Key: IP address, Value: User entry
12
 
13
+ # Data storage file
14
+ DATA_FILE = "user_data.json"
15
 
16
+ # --- Data Models ---
17
  class UserEntry(BaseModel):
18
  ip_address: str
19
  device_type: str
20
+ timestamp: datetime
21
+
22
+ @validator("ip_address")
23
+ def validate_ip_address(cls, value):
24
+ parts = value.split('.')
25
+ if len(parts) != 4:
26
+ raise ValueError("Invalid IP address format")
27
+ for part in parts:
28
+ try:
29
+ num = int(part)
30
+ if not 0 <= num <= 255:
31
+ raise ValueError("Invalid IP address value")
32
+ except ValueError:
33
+ raise ValueError("Invalid IP address value")
34
+ return value
35
+
36
+ # --- Helper Functions ---
37
+ def load_data():
38
+ """Loads data from the JSON file into the in-memory store."""
39
+ global user_data
40
+ if os.path.exists(DATA_FILE):
41
+ try:
42
+ with open(DATA_FILE, "r") as f:
43
+ data = json.load(f)
44
+ # Convert timestamp strings back to datetime objects
45
+ user_data = {
46
+ ip: {**entry, "timestamp": datetime.fromisoformat(entry["timestamp"])}
47
+ for ip, entry in data.items()
48
+ }
49
+ except Exception as e:
50
+ print(f"Error loading data from {DATA_FILE}: {e}")
51
+ # Handle the error gracefully, maybe start with an empty dataset
52
+ user_data = {}
53
+
54
+ def save_data():
55
+ """Saves the in-memory data to the JSON file."""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
56
  try:
57
+ with open(DATA_FILE, "w") as f:
58
+ # Convert datetime objects to ISO format for JSON serialization
59
+ serializable_data = {
60
+ ip: {**entry, "timestamp": entry["timestamp"].isoformat()}
61
+ for ip, entry in user_data.items()
62
+ }
63
+ json.dump(serializable_data, f, indent=4)
64
+ except Exception as e:
65
+ print(f"Error saving data to {DATA_FILE}: {e}")
66
+
67
+ def clean_old_data():
68
+ """Deletes data older than 7 days."""
69
+ global user_data
70
+ cutoff_time = datetime.now() - timedelta(days=7)
71
+ ips_to_delete = [ip for ip, entry in user_data.items() if entry["timestamp"] < cutoff_time]
72
+ for ip in ips_to_delete:
73
+ del user_data[ip]
74
+ if ips_to_delete:
75
+ save_data() # Save changes after deleting old data
76
+
77
+ # Load data on startup
78
+ load_data()
79
+
80
+ # --- API Endpoints ---
81
+
82
+ @app.post("/entry/", response_model=UserEntry, status_code=201)
83
+ async def create_user_entry(entry_data: UserEntry):
84
+ """Endpoint to record user entry."""
85
+ try:
86
+ entry_data.timestamp = datetime.now()
87
+ user_data[entry_data.ip_address] = entry_data.dict()
88
+ save_data()
89
+ return entry_data
90
  except ValueError as ve:
91
  raise HTTPException(status_code=400, detail=str(ve))
92
  except Exception as e:
93
+ raise HTTPException(status_code=500, detail=f"Internal server error: {e}")
94
 
95
+ @app.get("/analytics/")
96
+ async def get_user_analytics(period: str = Query(..., enum=["last_hour", "last_day", "last_7_day"])):
97
+ """Endpoint to get user analytics."""
98
  try:
99
+ clean_old_data() # Clean data before processing
100
+
101
+ now = datetime.now()
102
+ if period == "last_hour":
103
+ cutoff = now - timedelta(hours=1)
104
+ elif period == "last_day":
105
+ cutoff = now - timedelta(days=1)
106
+ elif period == "last_7_day":
107
+ cutoff = now - timedelta(days=7)
108
+ else:
109
+ raise HTTPException(status_code=400, detail="Invalid period specified")
110
+
111
+ filtered_data = [entry for entry in user_data.values() if entry["timestamp"] >= cutoff]
112
+ unique_users = len(set(entry["ip_address"] for entry in filtered_data))
113
+ device_counts: Dict[str, int] = {}
114
+ for entry in filtered_data:
115
+ device_counts[entry["device_type"]] = device_counts.get(entry["device_type"], 0) + 1
116
+
117
  return {
118
+ "total_unique_users": unique_users,
119
+ "device_type_info": device_counts,
120
  }
121
+ except HTTPException as he:
122
+ raise he
123
  except Exception as e:
124
+ raise HTTPException(status_code=500, detail=f"Error generating analytics: {e}")
125
 
126
+ @app.get("/export/", response_model=Dict[str, UserEntry])
127
+ async def export_user_data():
128
+ """Endpoint to export all user data in JSON format."""
129
  try:
130
+ clean_old_data() # Ensure no old data is exported
131
+ return user_data
 
132
  except Exception as e:
133
+ raise HTTPException(status_code=500, detail=f"Error exporting data: {e}")
134
 
135
+ @app.post("/import/")
136
+ async def import_user_data(data: Dict[str, dict] = Body(...)):
137
+ """Endpoint to import user data from JSON."""
138
  try:
139
+ imported_count = 0
140
+ for ip, entry_dict in data.items():
141
+ try:
142
+ # Validate the imported entry
143
+ entry = UserEntry(**entry_dict)
144
+ entry.timestamp = datetime.fromisoformat(entry_dict["timestamp"]) # Ensure timestamp is datetime
145
+ user_data[ip] = entry.dict()
146
+ imported_count += 1
147
+ except Exception as e:
148
+ print(f"Error importing entry for IP {ip}: {e}") # Log individual import errors
149
+
150
+ save_data()
151
+ return {"message": f"Successfully imported {imported_count} user entries."}
152
+ except json.JSONDecodeError:
153
+ raise HTTPException(status_code=400, detail="Invalid JSON format")
154
  except Exception as e:
155
+ raise HTTPException(status_code=500, detail=f"Error importing data: {e}")
156
 
157
+ # --- Background Task (Optional, for regular cleanup) ---
158
+ async def scheduled_cleanup():
159
+ """Periodically clean up old data."""
160
  while True:
161
+ clean_old_data()
162
+ await asyncio.sleep(60 * 60) # Clean every hour
163
+
164
+ # Import asyncio if you use the background task
165
+ import asyncio
166
+ from fastapi import BackgroundTasks
 
 
 
167
 
168
  @app.on_event("startup")
169
  async def startup_event():
170
+ # You can uncomment this to run the background task
171
+ asyncio.create_task(scheduled_cleanup())
172
+ pass
173
+
174
+ # --- Error Handling (Advanced - using exception handlers) ---
175
+ from fastapi import Request
176
+ from fastapi.responses import JSONResponse
177
 
178
  @app.exception_handler(HTTPException)
179
  async def http_exception_handler(request: Request, exc: HTTPException):
180
  return JSONResponse(
181
  status_code=exc.status_code,
182
+ content={"message": exc.detail},
183
  )
184
 
185
  @app.exception_handler(Exception)
186
+ async def general_exception_handler(request: Request, exc: Exception):
187
  return JSONResponse(
188
  status_code=500,
189
+ content={"message": f"An unexpected error occurred: {exc}"},
190
  )
191
 
192
  if __name__ == "__main__":