File size: 5,399 Bytes
f19d5db
 
 
 
410351d
f19d5db
 
3457223
f19d5db
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cbb9517
3457223
 
d259141
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
from fastapi import FastAPI, HTTPException, Request, BackgroundTasks
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field, validator
from typing import Optional, List, Dict
from datetime import datetime, timedelta
import json
import asyncio

app = FastAPI(title="User Tracking API")

class UserEntry(BaseModel):
    ip_address: str
    device_type: str
    timestamp: Optional[datetime] = Field(default_factory=datetime.utcnow)

    @validator('ip_address')
    def validate_ip(cls, v):
        # Simple IP address validation
        parts = v.split('.')
        if len(parts) != 4 or not all(part.isdigit() and 0 <= int(part) <= 255 for part in parts):
            raise ValueError('Invalid IP address format')
        return v

class DataResponse(BaseModel):
    total_unique_users: int
    device_info: Dict[str, int]

class ImportData(BaseModel):
    data: List[Dict]

class ExportData(BaseModel):
    data: List[Dict]

# In-memory data storage
data_store: List[Dict] = []
data_lock = asyncio.Lock()

async def add_user_entry(entry: Dict):
    async with data_lock:
        # Remove existing entry with same IP
        global data_store
        data_store = [e for e in data_store if e['ip_address'] != entry['ip_address']]
        data_store.append(entry)

async def get_filtered_data(time_filter: str):
    now = datetime.utcnow()
    if time_filter == "last_hour":
        cutoff = now - timedelta(hours=1)
    elif time_filter == "last_day":
        cutoff = now - timedelta(days=1)
    elif time_filter == "last_7_days":
        cutoff = now - timedelta(days=7)
    else:
        raise ValueError("Invalid time filter. Choose from 'last_hour', 'last_day', 'last_7_days'.")

    async with data_lock:
        filtered = [e for e in data_store if datetime.fromisoformat(e['timestamp']) >= cutoff]
    return filtered

async def export_data_to_json():
    async with data_lock:
        return json.dumps(data_store, indent=4)

async def import_data_from_json(json_data: List[Dict]):
    async with data_lock:
        global data_store
        # Validate and add entries
        for entry in json_data:
            # Basic validation
            if 'ip_address' not in entry or 'device_type' not in entry or 'timestamp' not in entry:
                raise ValueError("Invalid data format")
            # Optionally, add more validation here
        data_store = json_data

@app.post("/user_entry", status_code=201)
async def user_entry(entry: UserEntry):
    try:
        entry_dict = entry.dict()
        await add_user_entry(entry_dict)
        return {"message": "User entry added successfully"}
    except ValueError as ve:
        raise HTTPException(status_code=400, detail=str(ve))
    except Exception as e:
        raise HTTPException(status_code=500, detail="Internal Server Error")

@app.get("/user_data", response_model=DataResponse)
async def get_user_data(time_filter: str = "last_day"):
    try:
        filtered = await get_filtered_data(time_filter)
        unique_users = {entry['ip_address'] for entry in filtered}
        device_info = {}
        for entry in filtered:
            device = entry['device_type']
            device_info[device] = device_info.get(device, 0) + 1
        return {
            "total_unique_users": len(unique_users),
            "device_info": device_info
        }
    except ValueError as ve:
        raise HTTPException(status_code=400, detail=str(ve))
    except Exception as e:
        raise HTTPException(status_code=500, detail="Internal Server Error")

@app.get("/export_data", response_model=ExportData)
async def export_data():
    try:
        data_json = await export_data_to_json()
        data = json.loads(data_json)
        return {"data": data}
    except Exception as e:
        raise HTTPException(status_code=500, detail="Failed to export data")

@app.post("/import_data", status_code=200)
async def import_data(import_payload: ImportData):
    try:
        await import_data_from_json(import_payload.data)
        return {"message": "Data imported successfully"}
    except ValueError as ve:
        raise HTTPException(status_code=400, detail=str(ve))
    except Exception as e:
        raise HTTPException(status_code=500, detail="Failed to import data")

async def cleanup_old_data():
    while True:
        await asyncio.sleep(3600)  # Run every hour
        cutoff = datetime.utcnow() - timedelta(days=7)
        async with data_lock:
            global data_store
            original_length = len(data_store)
            data_store = [e for e in data_store if datetime.fromisoformat(e['timestamp']) >= cutoff]
            cleaned = original_length - len(data_store)
            if cleaned > 0:
                print(f"Cleaned up {cleaned} old entries.")

@app.on_event("startup")
async def startup_event():
    asyncio.create_task(cleanup_old_data())

@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
    return JSONResponse(
        status_code=exc.status_code,
        content={"error": exc.detail},
    )

@app.exception_handler(Exception)
async def unhandled_exception_handler(request: Request, exc: Exception):
    return JSONResponse(
        status_code=500,
        content={"error": "An unexpected error occurred."},
    )

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8083)