|
from datetime import datetime |
|
from fastapi import FastAPI, Request |
|
from fastapi.responses import Response, FileResponse |
|
from typing import Any |
|
|
|
import json |
|
import subprocess |
|
|
|
|
|
class PrettyJSONResponse(Response): |
|
media_type = "application/json" |
|
|
|
def render(self, content: Any) -> bytes: |
|
return json.dumps(content, indent=2).encode("utf-8") |
|
|
|
|
|
app = FastAPI() |
|
|
|
|
|
@app.middleware("http") |
|
async def log_request(request: Request, call_next: Any): |
|
ts = datetime.now().strftime("%y%m%d%H%M%S%f") |
|
data = { |
|
"day": int(ts[:6]), |
|
"dt": int(ts[:-3]), |
|
"url": request.url, |
|
"query_params": request.query_params, |
|
"client": request.client.host, |
|
"method": request.method, |
|
"headers": dict(request.headers), |
|
} |
|
output = json.dumps( |
|
obj=data, |
|
default=str, |
|
indent=None, |
|
separators=(", ", ":"), |
|
) |
|
with open("a.json", "a") as f: |
|
separator = "\n" if f.tell() else "" |
|
f.write(separator + output) |
|
|
|
response = await call_next(request) |
|
return response |
|
|
|
|
|
@app.get("/") |
|
def read_root(request: Request): |
|
return {"Hello": request.client.host} |
|
|
|
|
|
@app.get("/a", response_class=PrettyJSONResponse) |
|
def get_analytics(n: int = 5): |
|
if n == 0: |
|
cmd = "cat a.json" |
|
else: |
|
cmd = f"tail -{n} a.json" |
|
json_lines = subprocess.run(cmd.split(), capture_output=True).stdout |
|
content = json.loads(f"[{json_lines.replace(b"\n", b",").decode()}]") |
|
return content |
|
|
|
|
|
@app.api_route("/qa", response_class=FileResponse, methods=["GET", "HEAD"]) |
|
def query_analytics(): |
|
return "a.json" |
|
|