from fastapi import FastAPI, HTTPException, Request, BackgroundTasks from fastapi.responses import JSONResponse from pydantic import BaseModel, Field, validator from typing import Optional, List, Dict from datetime import datetime, timedelta import json import asyncio app = FastAPI(title="User Tracking API") class UserEntry(BaseModel): ip_address: str device_type: str timestamp: Optional[datetime] = Field(default_factory=datetime.utcnow) @validator('ip_address') def validate_ip(cls, v): # Simple IP address validation parts = v.split('.') if len(parts) != 4 or not all(part.isdigit() and 0 <= int(part) <= 255 for part in parts): raise ValueError('Invalid IP address format') return v class DataResponse(BaseModel): total_unique_users: int device_info: Dict[str, int] class ImportData(BaseModel): data: List[Dict] class ExportData(BaseModel): data: List[Dict] # In-memory data storage data_store: List[Dict] = [] data_lock = asyncio.Lock() async def add_user_entry(entry: Dict): async with data_lock: # Remove existing entry with same IP global data_store data_store = [e for e in data_store if e['ip_address'] != entry['ip_address']] data_store.append(entry) async def get_filtered_data(time_filter: str): now = datetime.utcnow() if time_filter == "last_hour": cutoff = now - timedelta(hours=1) elif time_filter == "last_day": cutoff = now - timedelta(days=1) elif time_filter == "last_7_days": cutoff = now - timedelta(days=7) else: raise ValueError("Invalid time filter. Choose from 'last_hour', 'last_day', 'last_7_days'.") async with data_lock: filtered = [e for e in data_store if datetime.fromisoformat(e['timestamp']) >= cutoff] return filtered async def export_data_to_json(): async with data_lock: return json.dumps(data_store, indent=4) async def import_data_from_json(json_data: List[Dict]): async with data_lock: global data_store # Validate and add entries for entry in json_data: # Basic validation if 'ip_address' not in entry or 'device_type' not in entry or 'timestamp' not in entry: raise ValueError("Invalid data format") # Optionally, add more validation here data_store = json_data @app.post("/user_entry", status_code=201) async def user_entry(entry: UserEntry): try: entry_dict = entry.dict() await add_user_entry(entry_dict) return {"message": "User entry added successfully"} except ValueError as ve: raise HTTPException(status_code=400, detail=str(ve)) except Exception as e: raise HTTPException(status_code=500, detail="Internal Server Error") @app.get("/user_data", response_model=DataResponse) async def get_user_data(time_filter: str = "last_day"): try: filtered = await get_filtered_data(time_filter) unique_users = {entry['ip_address'] for entry in filtered} device_info = {} for entry in filtered: device = entry['device_type'] device_info[device] = device_info.get(device, 0) + 1 return { "total_unique_users": len(unique_users), "device_info": device_info } except ValueError as ve: raise HTTPException(status_code=400, detail=str(ve)) except Exception as e: raise HTTPException(status_code=500, detail="Internal Server Error") @app.get("/export_data", response_model=ExportData) async def export_data(): try: data_json = await export_data_to_json() data = json.loads(data_json) return {"data": data} except Exception as e: raise HTTPException(status_code=500, detail="Failed to export data") @app.post("/import_data", status_code=200) async def import_data(import_payload: ImportData): try: await import_data_from_json(import_payload.data) return {"message": "Data imported successfully"} except ValueError as ve: raise HTTPException(status_code=400, detail=str(ve)) except Exception as e: raise HTTPException(status_code=500, detail="Failed to import data") async def cleanup_old_data(): while True: await asyncio.sleep(3600) # Run every hour cutoff = datetime.utcnow() - timedelta(days=7) async with data_lock: global data_store original_length = len(data_store) data_store = [e for e in data_store if datetime.fromisoformat(e['timestamp']) >= cutoff] cleaned = original_length - len(data_store) if cleaned > 0: print(f"Cleaned up {cleaned} old entries.") @app.on_event("startup") async def startup_event(): asyncio.create_task(cleanup_old_data()) @app.exception_handler(HTTPException) async def http_exception_handler(request: Request, exc: HTTPException): return JSONResponse( status_code=exc.status_code, content={"error": exc.detail}, ) @app.exception_handler(Exception) async def unhandled_exception_handler(request: Request, exc: Exception): return JSONResponse( status_code=500, content={"error": "An unexpected error occurred."}, ) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8083)