|
from typing import List, Dict, Optional |
|
from datetime import datetime, timedelta |
|
from fastapi import FastAPI, HTTPException, Query, Body, Request |
|
from pydantic import BaseModel, validator |
|
import json |
|
import os |
|
|
|
app = FastAPI() |
|
|
|
|
|
user_data: Dict[str, dict] = {} |
|
|
|
|
|
DATA_FILE = "user_data.json" |
|
|
|
|
|
class UserEntry(BaseModel): |
|
ip_address: str |
|
device_type: str |
|
timestamp: datetime |
|
|
|
@validator("ip_address") |
|
def validate_ip_address(cls, value): |
|
parts = value.split('.') |
|
if len(parts) != 4: |
|
raise ValueError("Invalid IP address format") |
|
for part in parts: |
|
try: |
|
num = int(part) |
|
if not 0 <= num <= 255: |
|
raise ValueError("Invalid IP address value") |
|
except ValueError: |
|
raise ValueError("Invalid IP address value") |
|
return value |
|
|
|
|
|
def load_data(): |
|
"""Loads data from the JSON file into the in-memory store.""" |
|
global user_data |
|
if os.path.exists(DATA_FILE): |
|
try: |
|
with open(DATA_FILE, "r") as f: |
|
data = json.load(f) |
|
|
|
user_data = { |
|
ip: {**entry, "timestamp": datetime.fromisoformat(entry["timestamp"])} |
|
for ip, entry in data.items() |
|
} |
|
except Exception as e: |
|
print(f"Error loading data from {DATA_FILE}: {e}") |
|
|
|
user_data = {} |
|
|
|
def save_data(): |
|
"""Saves the in-memory data to the JSON file.""" |
|
try: |
|
with open(DATA_FILE, "w") as f: |
|
|
|
serializable_data = { |
|
ip: {**entry, "timestamp": entry["timestamp"].isoformat()} |
|
for ip, entry in user_data.items() |
|
} |
|
json.dump(serializable_data, f, indent=4) |
|
except Exception as e: |
|
print(f"Error saving data to {DATA_FILE}: {e}") |
|
|
|
def clean_old_data(): |
|
"""Deletes data older than 7 days.""" |
|
global user_data |
|
cutoff_time = datetime.now() - timedelta(days=7) |
|
ips_to_delete = [ip for ip, entry in user_data.items() if entry["timestamp"] < cutoff_time] |
|
for ip in ips_to_delete: |
|
del user_data[ip] |
|
if ips_to_delete: |
|
save_data() |
|
|
|
|
|
load_data() |
|
|
|
|
|
@app.post("/auto_entry/", response_model=UserEntry, status_code=201) |
|
async def create_auto_user_entry( |
|
request: Request, |
|
device_type: str = Body(...), |
|
timestamp: Optional[datetime] = Body(None) |
|
): |
|
""" |
|
Endpoint to automatically record user entry by extracting the IP address |
|
from the request and taking device_type and optional timestamp as input. |
|
""" |
|
try: |
|
|
|
ip_address = request.client.host |
|
|
|
|
|
if not timestamp: |
|
timestamp = datetime.now() |
|
|
|
|
|
entry_data = UserEntry( |
|
ip_address=ip_address, |
|
device_type=device_type, |
|
timestamp=timestamp |
|
) |
|
|
|
|
|
user_data[ip_address] = entry_data.dict() |
|
save_data() |
|
|
|
return entry_data |
|
|
|
except ValueError as ve: |
|
raise HTTPException(status_code=400, detail=str(ve)) |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Internal server error: {e}") |
|
|
|
@app.post("/entry/", response_model=UserEntry, status_code=201) |
|
async def create_user_entry(entry_data: UserEntry): |
|
"""Endpoint to record user entry.""" |
|
try: |
|
entry_data.timestamp = datetime.now() |
|
user_data[entry_data.ip_address] = entry_data.dict() |
|
save_data() |
|
return entry_data |
|
except ValueError as ve: |
|
raise HTTPException(status_code=400, detail=str(ve)) |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Internal server error: {e}") |
|
|
|
@app.get("/analytics/") |
|
async def get_user_analytics(period: str = Query(..., enum=["last_hour", "last_day", "last_7_day"])): |
|
"""Endpoint to get user analytics.""" |
|
try: |
|
clean_old_data() |
|
|
|
now = datetime.now() |
|
if period == "last_hour": |
|
cutoff = now - timedelta(hours=1) |
|
elif period == "last_day": |
|
cutoff = now - timedelta(days=1) |
|
elif period == "last_7_day": |
|
cutoff = now - timedelta(days=7) |
|
else: |
|
raise HTTPException(status_code=400, detail="Invalid period specified") |
|
|
|
filtered_data = [entry for entry in user_data.values() if entry["timestamp"] >= cutoff] |
|
unique_users = len(set(entry["ip_address"] for entry in filtered_data)) |
|
device_counts: Dict[str, int] = {} |
|
for entry in filtered_data: |
|
device_counts[entry["device_type"]] = device_counts.get(entry["device_type"], 0) + 1 |
|
|
|
return { |
|
"total_unique_users": unique_users, |
|
"device_type_info": device_counts, |
|
} |
|
except HTTPException as he: |
|
raise he |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Error generating analytics: {e}") |
|
|
|
@app.get("/export/", response_model=Dict[str, UserEntry]) |
|
async def export_user_data(): |
|
"""Endpoint to export all user data in JSON format.""" |
|
try: |
|
clean_old_data() |
|
return user_data |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Error exporting data: {e}") |
|
|
|
@app.post("/import/") |
|
async def import_user_data(data: Dict[str, dict] = Body(...)): |
|
"""Endpoint to import user data from JSON.""" |
|
try: |
|
imported_count = 0 |
|
for ip, entry_dict in data.items(): |
|
try: |
|
|
|
entry = UserEntry(**entry_dict) |
|
entry.timestamp = datetime.fromisoformat(entry_dict["timestamp"]) |
|
user_data[ip] = entry.dict() |
|
imported_count += 1 |
|
except Exception as e: |
|
print(f"Error importing entry for IP {ip}: {e}") |
|
|
|
save_data() |
|
return {"message": f"Successfully imported {imported_count} user entries."} |
|
except json.JSONDecodeError: |
|
raise HTTPException(status_code=400, detail="Invalid JSON format") |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Error importing data: {e}") |
|
|
|
|
|
async def scheduled_cleanup(): |
|
"""Periodically clean up old data.""" |
|
while True: |
|
clean_old_data() |
|
await asyncio.sleep(60 * 60) |
|
|
|
|
|
import asyncio |
|
from fastapi import BackgroundTasks |
|
|
|
@app.on_event("startup") |
|
async def startup_event(): |
|
|
|
asyncio.create_task(scheduled_cleanup()) |
|
pass |
|
|
|
|
|
from fastapi import Request |
|
from fastapi.responses import JSONResponse |
|
|
|
@app.exception_handler(HTTPException) |
|
async def http_exception_handler(request: Request, exc: HTTPException): |
|
return JSONResponse( |
|
status_code=exc.status_code, |
|
content={"message": exc.detail}, |
|
) |
|
|
|
@app.exception_handler(Exception) |
|
async def general_exception_handler(request: Request, exc: Exception): |
|
return JSONResponse( |
|
status_code=500, |
|
content={"message": f"An unexpected error occurred: {exc}"}, |
|
) |
|
|
|
if __name__ == "__main__": |
|
import uvicorn |
|
uvicorn.run(app, host="0.0.0.0", port=8083, debug=True) |