from typing import List, Dict, Optional from datetime import datetime, timedelta from fastapi import FastAPI, HTTPException, Query, Body, Request from pydantic import BaseModel, validator import json import os app = FastAPI() # Data storage (in-memory) user_data: Dict[str, dict] = {} # Key: IP address, Value: User entry # Data storage file DATA_FILE = "user_data.json" # --- Data Models --- class UserEntry(BaseModel): ip_address: str device_type: str timestamp: datetime @validator("ip_address") def validate_ip_address(cls, value): parts = value.split('.') if len(parts) != 4: raise ValueError("Invalid IP address format") for part in parts: try: num = int(part) if not 0 <= num <= 255: raise ValueError("Invalid IP address value") except ValueError: raise ValueError("Invalid IP address value") return value # --- Helper Functions --- def load_data(): """Loads data from the JSON file into the in-memory store.""" global user_data if os.path.exists(DATA_FILE): try: with open(DATA_FILE, "r") as f: data = json.load(f) # Convert timestamp strings back to datetime objects user_data = { ip: {**entry, "timestamp": datetime.fromisoformat(entry["timestamp"])} for ip, entry in data.items() } except Exception as e: print(f"Error loading data from {DATA_FILE}: {e}") # Handle the error gracefully, maybe start with an empty dataset user_data = {} def save_data(): """Saves the in-memory data to the JSON file.""" try: with open(DATA_FILE, "w") as f: # Convert datetime objects to ISO format for JSON serialization serializable_data = { ip: {**entry, "timestamp": entry["timestamp"].isoformat()} for ip, entry in user_data.items() } json.dump(serializable_data, f, indent=4) except Exception as e: print(f"Error saving data to {DATA_FILE}: {e}") def clean_old_data(): """Deletes data older than 7 days.""" global user_data cutoff_time = datetime.now() - timedelta(days=7) ips_to_delete = [ip for ip, entry in user_data.items() if entry["timestamp"] < cutoff_time] for ip in ips_to_delete: del user_data[ip] if ips_to_delete: save_data() # Save changes after deleting old data # Load data on startup load_data() # --- API Endpoints --- @app.post("/auto_entry/", response_model=UserEntry, status_code=201) async def create_auto_user_entry( request: Request, device_type: str = Body(...), timestamp: Optional[datetime] = Body(None) ): """ Endpoint to automatically record user entry by extracting the IP address from the request and taking device_type and optional timestamp as input. """ try: # Automatically extract the client's IP address ip_address = request.client.host # If timestamp is not provided, use the current time if not timestamp: timestamp = datetime.now() # Create a UserEntry object entry_data = UserEntry( ip_address=ip_address, device_type=device_type, timestamp=timestamp ) # Save the entry user_data[ip_address] = entry_data.dict() save_data() return entry_data except ValueError as ve: raise HTTPException(status_code=400, detail=str(ve)) except Exception as e: raise HTTPException(status_code=500, detail=f"Internal server error: {e}") @app.post("/entry/", response_model=UserEntry, status_code=201) async def create_user_entry(entry_data: UserEntry): """Endpoint to record user entry.""" try: entry_data.timestamp = datetime.now() user_data[entry_data.ip_address] = entry_data.dict() save_data() return entry_data except ValueError as ve: raise HTTPException(status_code=400, detail=str(ve)) except Exception as e: raise HTTPException(status_code=500, detail=f"Internal server error: {e}") @app.get("/analytics/") async def get_user_analytics(period: str = Query(..., enum=["last_hour", "last_day", "last_7_day"])): """Endpoint to get user analytics.""" try: clean_old_data() # Clean data before processing now = datetime.now() if period == "last_hour": cutoff = now - timedelta(hours=1) elif period == "last_day": cutoff = now - timedelta(days=1) elif period == "last_7_day": cutoff = now - timedelta(days=7) else: raise HTTPException(status_code=400, detail="Invalid period specified") filtered_data = [entry for entry in user_data.values() if entry["timestamp"] >= cutoff] unique_users = len(set(entry["ip_address"] for entry in filtered_data)) device_counts: Dict[str, int] = {} for entry in filtered_data: device_counts[entry["device_type"]] = device_counts.get(entry["device_type"], 0) + 1 return { "total_unique_users": unique_users, "device_type_info": device_counts, } except HTTPException as he: raise he except Exception as e: raise HTTPException(status_code=500, detail=f"Error generating analytics: {e}") @app.get("/export/", response_model=Dict[str, UserEntry]) async def export_user_data(): """Endpoint to export all user data in JSON format.""" try: clean_old_data() # Ensure no old data is exported return user_data except Exception as e: raise HTTPException(status_code=500, detail=f"Error exporting data: {e}") @app.post("/import/") async def import_user_data(data: Dict[str, dict] = Body(...)): """Endpoint to import user data from JSON.""" try: imported_count = 0 for ip, entry_dict in data.items(): try: # Validate the imported entry entry = UserEntry(**entry_dict) entry.timestamp = datetime.fromisoformat(entry_dict["timestamp"]) # Ensure timestamp is datetime user_data[ip] = entry.dict() imported_count += 1 except Exception as e: print(f"Error importing entry for IP {ip}: {e}") # Log individual import errors save_data() return {"message": f"Successfully imported {imported_count} user entries."} except json.JSONDecodeError: raise HTTPException(status_code=400, detail="Invalid JSON format") except Exception as e: raise HTTPException(status_code=500, detail=f"Error importing data: {e}") # --- Background Task (Optional, for regular cleanup) --- async def scheduled_cleanup(): """Periodically clean up old data.""" while True: clean_old_data() await asyncio.sleep(60 * 60) # Clean every hour # Import asyncio if you use the background task import asyncio from fastapi import BackgroundTasks @app.on_event("startup") async def startup_event(): # You can uncomment this to run the background task asyncio.create_task(scheduled_cleanup()) pass # --- Error Handling (Advanced - using exception handlers) --- from fastapi import Request from fastapi.responses import JSONResponse @app.exception_handler(HTTPException) async def http_exception_handler(request: Request, exc: HTTPException): return JSONResponse( status_code=exc.status_code, content={"message": exc.detail}, ) @app.exception_handler(Exception) async def general_exception_handler(request: Request, exc: Exception): return JSONResponse( status_code=500, content={"message": f"An unexpected error occurred: {exc}"}, ) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8083, debug=True)