|
from typing import List, Dict, Optional |
|
from datetime import datetime, timedelta |
|
from fastapi import FastAPI, HTTPException, Query, Body, Request |
|
from pydantic import BaseModel, validator, root_validator |
|
import json |
|
import os |
|
from user_agents import parse |
|
|
|
app = FastAPI() |
|
|
|
|
|
user_data: Dict[str, dict] = {} |
|
|
|
|
|
class UserEntry(BaseModel): |
|
ip_address: str |
|
device_type: str = "N/A" |
|
timestamp: datetime = datetime.now() |
|
browser: str = "N/A" |
|
OS: str = "N/A" |
|
|
|
@validator("ip_address") |
|
def validate_ip_address(cls, value): |
|
parts = value.split('.') |
|
if len(parts) != 4: |
|
raise ValueError("Invalid IP address format") |
|
for part in parts: |
|
try: |
|
num = int(part) |
|
if not 0 <= num <= 255: |
|
raise ValueError("Invalid IP address value") |
|
except ValueError: |
|
raise ValueError("Invalid IP address value") |
|
return value |
|
|
|
@root_validator(pre=True) |
|
def set_default_values(cls, values): |
|
"""Set default values for missing fields.""" |
|
defaults = { |
|
"device_type": "N/A", |
|
"browser": "N/A", |
|
"OS": "N/A", |
|
"timestamp": datetime.now() |
|
} |
|
for key, default_value in defaults.items(): |
|
if key not in values or values[key] is None: |
|
values[key] = default_value |
|
return values |
|
|
|
def clean_old_data(): |
|
"""Deletes data older than 7 days.""" |
|
global user_data |
|
cutoff_time = datetime.now() - timedelta(days=7) |
|
ips_to_delete = [ip for ip, entry in user_data.items() if entry["timestamp"] < cutoff_time] |
|
for ip in ips_to_delete: |
|
del user_data[ip] |
|
|
|
|
|
@app.post("/auto_entry/", response_model=UserEntry, status_code=201) |
|
async def create_auto_user_entry( |
|
request: Request |
|
): |
|
""" |
|
Endpoint to automatically record user entry by extracting the IP address |
|
from the request and taking device_type and optional timestamp as input. |
|
""" |
|
try: |
|
|
|
ip_address = request.client.host |
|
if "x-forwarded-for" in request.headers: |
|
ip_address = request.headers["x-forwarded-for"].split(",")[0] |
|
|
|
user_agent = request.headers.get("User-Agent", "N/A") |
|
user_agent_parsed = parse(user_agent) |
|
|
|
device_type = "Mobile" if user_agent_parsed.is_mobile else "Tablet" if user_agent_parsed.is_tablet else "Desktop" |
|
browser_name = user_agent_parsed.browser.family if user_agent_parsed else "N/A" |
|
os_name = user_agent_parsed.os.family if user_agent_parsed else "N/A" |
|
|
|
timestamp = datetime.now() |
|
|
|
|
|
entry_data = UserEntry( |
|
ip_address=ip_address, |
|
device_type=device_type, |
|
timestamp=timestamp, |
|
browser=browser_name, |
|
OS=os_name |
|
) |
|
|
|
|
|
user_data[ip_address] = entry_data.dict() |
|
|
|
return entry_data |
|
|
|
except ValueError as ve: |
|
raise HTTPException(status_code=400, detail=str(ve)) |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Internal server error: {e}") |
|
|
|
@app.get("/analytics/") |
|
async def get_user_analytics(period: str = Query(..., enum=["last_hour", "last_day", "last_7_day"])): |
|
"""Endpoint to get advanced user analytics.""" |
|
try: |
|
clean_old_data() |
|
|
|
now = datetime.now() |
|
if period == "last_hour": |
|
cutoff = now - timedelta(hours=1) |
|
elif period == "last_day": |
|
cutoff = now - timedelta(days=1) |
|
elif period == "last_7_day": |
|
cutoff = now - timedelta(days=7) |
|
else: |
|
raise HTTPException(status_code=400, detail="Invalid period specified") |
|
|
|
filtered_data = [entry for entry in user_data.values() if entry["timestamp"] >= cutoff] |
|
unique_users = len(set(entry["ip_address"] for entry in filtered_data)) |
|
device_counts: Dict[str, int] = {} |
|
browser_counts: Dict[str, int] = {} |
|
os_counts: Dict[str, int] = {} |
|
|
|
for entry in filtered_data: |
|
device_counts[entry["device_type"]] = device_counts.get(entry["device_type"], 0) + 1 |
|
browser_counts[entry["browser"]] = browser_counts.get(entry["browser"], 0) + 1 |
|
os_counts[entry["OS"]] = os_counts.get(entry["OS"], 0) + 1 |
|
|
|
|
|
total_entries = len(filtered_data) |
|
device_percentages = {device: (count / total_entries) * 100 for device, count in device_counts.items()} |
|
browser_percentages = {browser: (count / total_entries) * 100 for browser, count in browser_counts.items()} |
|
os_percentages = {os: (count / total_entries) * 100 for os, count in os_counts.items()} |
|
|
|
return { |
|
"total_unique_users": unique_users, |
|
"device_type_info": { |
|
"counts": device_counts, |
|
"percentages": device_percentages |
|
}, |
|
"browser_info": { |
|
"counts": browser_counts, |
|
"percentages": browser_percentages |
|
}, |
|
"os_info": { |
|
"counts": os_counts, |
|
"percentages": os_percentages |
|
}, |
|
"total_entries": total_entries |
|
} |
|
except HTTPException as he: |
|
raise he |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Error generating analytics: {e}") |
|
|
|
|
|
@app.get("/export/", response_model=Dict[str, UserEntry]) |
|
async def export_user_data(): |
|
"""Endpoint to export all user data in JSON format.""" |
|
try: |
|
clean_old_data() |
|
return user_data |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Error exporting data: {e}") |
|
|
|
@app.post("/import/") |
|
async def import_user_data(data: Dict[str, dict] = Body(...)): |
|
"""Endpoint to import user data from JSON.""" |
|
try: |
|
imported_count = 0 |
|
for ip, entry_dict in data.items(): |
|
try: |
|
|
|
entry = UserEntry(**entry_dict) |
|
entry.timestamp = datetime.fromisoformat(entry_dict.get("timestamp", datetime.now().isoformat())) |
|
user_data[ip] = entry.dict() |
|
imported_count += 1 |
|
except Exception as e: |
|
print(f"Error importing entry for IP {ip}: {e}") |
|
return {"message": f"Successfully imported {imported_count} user entries."} |
|
except json.JSONDecodeError: |
|
raise HTTPException(status_code=400, detail="Invalid JSON format") |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Error importing data: {e}") |
|
|
|
|
|
user_data: Dict[str, dict] = {} |
|
poll_data: Dict[str, dict] = {} |
|
poll_responses: Dict[str, Dict[str, int]] = {} |
|
|
|
|
|
class PollCreate(BaseModel): |
|
poll_name: str |
|
question: str |
|
options: List[str] |
|
|
|
class PollEntry(BaseModel): |
|
poll_name: str |
|
response: int |
|
|
|
|
|
@app.post("/poll/create/", status_code=201) |
|
async def create_poll(poll: PollCreate): |
|
"""Endpoint to create a new poll.""" |
|
if poll.poll_name in poll_data: |
|
raise HTTPException(status_code=400, detail="Poll with this name already exists.") |
|
poll_data[poll.poll_name] = { |
|
"question": poll.question, |
|
"options": poll.options, |
|
"created_at": datetime.now() |
|
} |
|
poll_responses[poll.poll_name] = {} |
|
return {"message": "Poll created successfully.", "poll_name": poll.poll_name} |
|
|
|
@app.post("/poll/entry/", status_code=201) |
|
async def create_poll_entry(request: Request, poll_entry: PollEntry): |
|
"""Endpoint to record a user's response to a poll.""" |
|
ip_address = request.client.host |
|
if "x-forwarded-for" in request.headers: |
|
ip_address = request.headers["x-forwarded-for"].split(",")[0] |
|
|
|
if poll_entry.poll_name not in poll_data: |
|
raise HTTPException(status_code=404, detail="Poll not found.") |
|
|
|
if poll_entry.response < 1 or poll_entry.response > len(poll_data[poll_entry.poll_name]["options"]): |
|
raise HTTPException(status_code=400, detail="Invalid response option.") |
|
|
|
poll_responses[poll_entry.poll_name][ip_address] = poll_entry.response |
|
return {"message": "Poll entry recorded successfully."} |
|
|
|
@app.get("/poll/analytics/") |
|
async def get_poll_analytics(poll_name: str = Query(..., description="Name of the poll")): |
|
"""Endpoint to get analytics for a specific poll.""" |
|
if poll_name not in poll_data: |
|
raise HTTPException(status_code=404, detail="Poll not found.") |
|
|
|
responses = poll_responses[poll_name] |
|
response_counts = {option: 0 for option in range(1, len(poll_data[poll_name]["options"]) + 1)} |
|
for response in responses.values(): |
|
response_counts[response] += 1 |
|
|
|
return { |
|
"poll_name": poll_name, |
|
"question": poll_data[poll_name]["question"], |
|
"options": poll_data[poll_name]["options"], |
|
"response_counts": response_counts, |
|
"total_responses": len(responses) |
|
} |
|
|
|
|
|
async def scheduled_poll_cleanup(): |
|
"""Periodically clean up old polls.""" |
|
while True: |
|
now = datetime.now() |
|
polls_to_delete = [poll_name for poll_name, poll in poll_data.items() if (now - poll["created_at"]).days >= 7] |
|
for poll_name in polls_to_delete: |
|
del poll_data[poll_name] |
|
del poll_responses[poll_name] |
|
await asyncio.sleep(60 * 60 * 24) |
|
|
|
|
|
async def scheduled_cleanup(): |
|
"""Periodically clean up old data.""" |
|
while True: |
|
clean_old_data() |
|
await asyncio.sleep(60 * 60) |
|
|
|
|
|
import asyncio |
|
from fastapi import BackgroundTasks |
|
|
|
@app.on_event("startup") |
|
async def startup_event(): |
|
|
|
asyncio.create_task(scheduled_cleanup()) |
|
asyncio.create_task(scheduled_poll_cleanup()) |
|
pass |
|
|
|
|
|
from fastapi import Request |
|
from fastapi.responses import JSONResponse |
|
|
|
@app.exception_handler(HTTPException) |
|
async def http_exception_handler(request: Request, exc: HTTPException): |
|
return JSONResponse( |
|
status_code=exc.status_code, |
|
content={"message": exc.detail}, |
|
) |
|
|
|
@app.exception_handler(Exception) |
|
async def general_exception_handler(request: Request, exc: Exception): |
|
return JSONResponse( |
|
status_code=500, |
|
content={"message": f"An unexpected error occurred: {exc}"}, |
|
) |
|
|
|
if __name__ == "__main__": |
|
import uvicorn |
|
uvicorn.run(app, host="0.0.0.0", port=8083, debug=True) |