|
from fastapi import FastAPI, HTTPException, Request, BackgroundTasks |
|
from fastapi.responses import JSONResponse |
|
from pydantic import BaseModel, Field, validator |
|
from typing import Optional, List, Dict |
|
from datetime import datetime, timedelta |
|
import json |
|
import asyncio |
|
|
|
app = FastAPI(title="User Tracking API") |
|
|
|
class UserEntry(BaseModel): |
|
ip_address: str |
|
device_type: str |
|
timestamp: Optional[datetime] = Field(default_factory=datetime.utcnow) |
|
|
|
@validator('ip_address') |
|
def validate_ip(cls, v): |
|
|
|
parts = v.split('.') |
|
if len(parts) != 4 or not all(part.isdigit() and 0 <= int(part) <= 255 for part in parts): |
|
raise ValueError('Invalid IP address format') |
|
return v |
|
|
|
class DataResponse(BaseModel): |
|
total_unique_users: int |
|
device_info: Dict[str, int] |
|
|
|
class ImportData(BaseModel): |
|
data: List[Dict] |
|
|
|
class ExportData(BaseModel): |
|
data: List[Dict] |
|
|
|
|
|
data_store: List[Dict] = [] |
|
data_lock = asyncio.Lock() |
|
|
|
async def add_user_entry(entry: Dict): |
|
async with data_lock: |
|
|
|
global data_store |
|
data_store = [e for e in data_store if e['ip_address'] != entry['ip_address']] |
|
data_store.append(entry) |
|
|
|
async def get_filtered_data(time_filter: str): |
|
now = datetime.utcnow() |
|
if time_filter == "last_hour": |
|
cutoff = now - timedelta(hours=1) |
|
elif time_filter == "last_day": |
|
cutoff = now - timedelta(days=1) |
|
elif time_filter == "last_7_days": |
|
cutoff = now - timedelta(days=7) |
|
else: |
|
raise ValueError("Invalid time filter. Choose from 'last_hour', 'last_day', 'last_7_days'.") |
|
|
|
async with data_lock: |
|
filtered = [e for e in data_store if datetime.fromisoformat(e['timestamp']) >= cutoff] |
|
return filtered |
|
|
|
async def export_data_to_json(): |
|
async with data_lock: |
|
return json.dumps(data_store, indent=4) |
|
|
|
async def import_data_from_json(json_data: List[Dict]): |
|
async with data_lock: |
|
global data_store |
|
|
|
for entry in json_data: |
|
|
|
if 'ip_address' not in entry or 'device_type' not in entry or 'timestamp' not in entry: |
|
raise ValueError("Invalid data format") |
|
|
|
data_store = json_data |
|
|
|
@app.post("/user_entry", status_code=201) |
|
async def user_entry(entry: UserEntry): |
|
try: |
|
entry_dict = entry.dict() |
|
await add_user_entry(entry_dict) |
|
return {"message": "User entry added successfully"} |
|
except ValueError as ve: |
|
raise HTTPException(status_code=400, detail=str(ve)) |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail="Internal Server Error") |
|
|
|
@app.get("/user_data", response_model=DataResponse) |
|
async def get_user_data(time_filter: str = "last_day"): |
|
try: |
|
filtered = await get_filtered_data(time_filter) |
|
unique_users = {entry['ip_address'] for entry in filtered} |
|
device_info = {} |
|
for entry in filtered: |
|
device = entry['device_type'] |
|
device_info[device] = device_info.get(device, 0) + 1 |
|
return { |
|
"total_unique_users": len(unique_users), |
|
"device_info": device_info |
|
} |
|
except ValueError as ve: |
|
raise HTTPException(status_code=400, detail=str(ve)) |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail="Internal Server Error") |
|
|
|
@app.get("/export_data", response_model=ExportData) |
|
async def export_data(): |
|
try: |
|
data_json = await export_data_to_json() |
|
data = json.loads(data_json) |
|
return {"data": data} |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail="Failed to export data") |
|
|
|
@app.post("/import_data", status_code=200) |
|
async def import_data(import_payload: ImportData): |
|
try: |
|
await import_data_from_json(import_payload.data) |
|
return {"message": "Data imported successfully"} |
|
except ValueError as ve: |
|
raise HTTPException(status_code=400, detail=str(ve)) |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail="Failed to import data") |
|
|
|
async def cleanup_old_data(): |
|
while True: |
|
await asyncio.sleep(3600) |
|
cutoff = datetime.utcnow() - timedelta(days=7) |
|
async with data_lock: |
|
global data_store |
|
original_length = len(data_store) |
|
data_store = [e for e in data_store if datetime.fromisoformat(e['timestamp']) >= cutoff] |
|
cleaned = original_length - len(data_store) |
|
if cleaned > 0: |
|
print(f"Cleaned up {cleaned} old entries.") |
|
|
|
@app.on_event("startup") |
|
async def startup_event(): |
|
asyncio.create_task(cleanup_old_data()) |
|
|
|
@app.exception_handler(HTTPException) |
|
async def http_exception_handler(request: Request, exc: HTTPException): |
|
return JSONResponse( |
|
status_code=exc.status_code, |
|
content={"error": exc.detail}, |
|
) |
|
|
|
@app.exception_handler(Exception) |
|
async def unhandled_exception_handler(request: Request, exc: Exception): |
|
return JSONResponse( |
|
status_code=500, |
|
content={"error": "An unexpected error occurred."}, |
|
) |
|
|
|
if __name__ == "__main__": |
|
import uvicorn |
|
uvicorn.run(app, host="0.0.0.0", port=8083) |