File size: 5,411 Bytes
f19d5db 410351d f19d5db 3457223 f19d5db cbb9517 3457223 a3069a0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
from fastapi import FastAPI, HTTPException, Request, BackgroundTasks
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field, validator
from typing import Optional, List, Dict
from datetime import datetime, timedelta
import json
import asyncio
app = FastAPI(title="User Tracking API")
class UserEntry(BaseModel):
ip_address: str
device_type: str
timestamp: Optional[datetime] = Field(default_factory=datetime.utcnow)
@validator('ip_address')
def validate_ip(cls, v):
# Simple IP address validation
parts = v.split('.')
if len(parts) != 4 or not all(part.isdigit() and 0 <= int(part) <= 255 for part in parts):
raise ValueError('Invalid IP address format')
return v
class DataResponse(BaseModel):
total_unique_users: int
device_info: Dict[str, int]
class ImportData(BaseModel):
data: List[Dict]
class ExportData(BaseModel):
data: List[Dict]
# In-memory data storage
data_store: List[Dict] = []
data_lock = asyncio.Lock()
async def add_user_entry(entry: Dict):
async with data_lock:
# Remove existing entry with same IP
global data_store
data_store = [e for e in data_store if e['ip_address'] != entry['ip_address']]
data_store.append(entry)
async def get_filtered_data(time_filter: str):
now = datetime.utcnow()
if time_filter == "last_hour":
cutoff = now - timedelta(hours=1)
elif time_filter == "last_day":
cutoff = now - timedelta(days=1)
elif time_filter == "last_7_days":
cutoff = now - timedelta(days=7)
else:
raise ValueError("Invalid time filter. Choose from 'last_hour', 'last_day', 'last_7_days'.")
async with data_lock:
filtered = [e for e in data_store if datetime.fromisoformat(e['timestamp']) >= cutoff]
return filtered
async def export_data_to_json():
async with data_lock:
return json.dumps(data_store, indent=4)
async def import_data_from_json(json_data: List[Dict]):
async with data_lock:
global data_store
# Validate and add entries
for entry in json_data:
# Basic validation
if 'ip_address' not in entry or 'device_type' not in entry or 'timestamp' not in entry:
raise ValueError("Invalid data format")
# Optionally, add more validation here
data_store = json_data
@app.post("/user_entry", status_code=201)
async def user_entry(entry: UserEntry):
try:
entry_dict = entry.dict()
await add_user_entry(entry_dict)
return {"message": "User entry added successfully"}
except ValueError as ve:
raise HTTPException(status_code=400, detail=str(ve))
except Exception as e:
raise HTTPException(status_code=500, detail="Internal Server Error")
@app.get("/user_data", response_model=DataResponse)
async def get_user_data(time_filter: str = "last_day"):
try:
filtered = await get_filtered_data(time_filter)
unique_users = {entry['ip_address'] for entry in filtered}
device_info = {}
for entry in filtered:
device = entry['device_type']
device_info[device] = device_info.get(device, 0) + 1
return {
"total_unique_users": len(unique_users),
"device_info": device_info
}
except ValueError as ve:
raise HTTPException(status_code=400, detail=str(ve))
except Exception as e:
raise HTTPException(status_code=500, detail="Internal Server Error")
@app.get("/export_data", response_model=ExportData)
async def export_data():
try:
data_json = await export_data_to_json()
data = json.loads(data_json)
return {"data": data}
except Exception as e:
raise HTTPException(status_code=500, detail="Failed to export data")
@app.post("/import_data", status_code=200)
async def import_data(import_payload: ImportData):
try:
await import_data_from_json(import_payload.data)
return {"message": "Data imported successfully"}
except ValueError as ve:
raise HTTPException(status_code=400, detail=str(ve))
except Exception as e:
raise HTTPException(status_code=500, detail="Failed to import data")
async def cleanup_old_data():
while True:
await asyncio.sleep(3600) # Run every hour
cutoff = datetime.utcnow() - timedelta(days=7)
async with data_lock:
global data_store
original_length = len(data_store)
data_store = [e for e in data_store if datetime.fromisoformat(e['timestamp']) >= cutoff]
cleaned = original_length - len(data_store)
if cleaned > 0:
print(f"Cleaned up {cleaned} old entries.")
@app.on_event("startup")
async def startup_event():
asyncio.create_task(cleanup_old_data())
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
return JSONResponse(
status_code=exc.status_code,
content={"error": exc.detail},
)
@app.exception_handler(Exception)
async def unhandled_exception_handler(request: Request, exc: Exception):
return JSONResponse(
status_code=500,
content={"error": "An unexpected error occurred."},
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8083, debug=True) |