File size: 4,539 Bytes
8c290e6
 
 
bceb9b7
 
c2219a8
2c8258c
058dfe9
bceb9b7
 
8c290e6
bceb9b7
 
 
058dfe9
 
bceb9b7
 
 
 
 
 
 
 
8c290e6
 
 
 
 
 
 
 
bceb9b7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
058dfe9
bceb9b7
 
 
 
 
 
 
 
 
26bf52a
 
 
 
 
 
 
 
c2219a8
 
 
 
26bf52a
 
 
 
04d1a17
c2219a8
 
 
04d1a17
c2219a8
 
 
 
 
 
 
 
04d1a17
 
c2219a8
 
 
04d1a17
 
 
 
 
 
 
c2219a8
bceb9b7
 
 
 
 
058dfe9
bceb9b7
058dfe9
bceb9b7
 
 
 
 
 
 
058dfe9
8c290e6
 
 
 
 
 
 
 
 
 
 
 
2c8258c
 
 
 
8c290e6
 
 
2c8258c
058dfe9
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
from apscheduler.executors.asyncio import AsyncIOExecutor
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from contextlib import asynccontextmanager
from datetime import datetime
from fastapi import FastAPI, Request
from fastapi.responses import FileResponse, PlainTextResponse, Response
from os import getenv
from pathlib import Path
from typing import Any

import httpx
import json
import subprocess

LOGFILE = Path.home() / "a.json"


class PrettyJSONResponse(Response):
    media_type = "application/json"

    def render(self, content: Any) -> bytes:
        return json.dumps(content, indent=2).encode("utf-8")


@asynccontextmanager
async def lifespan(app: FastAPI):
    scheduler.start()
    yield
    scheduler.shutdown()


app = FastAPI(lifespan=lifespan)


@app.middleware("http")
async def log_request(request: Request, call_next: Any):
    ts = datetime.now().strftime("%y%m%d%H%M%S%f")
    data = {
        "day": int(ts[:6]),
        "dt": int(ts[:-3]),
        "url": request.url,
        "query_params": request.query_params,
        "client": request.client.host,
        "method": request.method,
        "headers": dict(request.headers),
    }
    output = json.dumps(
        obj=data,
        default=str,
        indent=None,
        separators=(", ", ":"),
    )
    with open(LOGFILE, "a") as f:
        separator = "\n" if f.tell() else ""
        f.write(separator + output)

    response = await call_next(request)
    return response


@app.get("/")
def read_root(request: Request):
    """Main URL returning an executable installer script.

    Query parameters can be used to install specific things, e.g.
    curl -fsSL "https://pup-py-fetch.hf.space?python=3.11&pixi=marimo&myenv=cowsay,duckdb"

    Package specs "duckdb>=1.1" are not supported.
    """

    query_params = dict(request.query_params)
    py_ver = query_params.pop("python", "3.12")
    if "Windows" in request.headers.get("user-agent"):
        pup_url = "https://raw.githubusercontent.com/liquidcarbon/puppy/main/pup.ps1"
        script = [
            f"$pup_ps1 = (iwr -useb {pup_url}).Content",
            f"& ([scriptblock]::Create($pup_ps1)) {py_ver}",
        ]
        hint = f"""iex (iwr "{request.url}").Content"""
    else:
        pup_url = "https://raw.githubusercontent.com/liquidcarbon/puppy/main/pup.sh"
        script = [f"curl -fsSL {pup_url} | bash -s {py_ver}"]
        hint = f"""curl -fsSL "{request.url}" | bash"""

    pixi_packages = query_params.pop("pixi", "")
    if pixi_packages:
        for pkg in pixi_packages.split(","):
            script.append(f"pixi add {pkg}")

    # remaining query params are venvs
    for venv, uv_packages in query_params.items():
        if venv in ["logs"]:
            continue
        for pkg in uv_packages.split(","):
            script.append(f"pup add {venv} {pkg}")

    script.extend(
        [
            "# 🐶 scripts end here; if you like what you see, copy-paste this recipe, or run like so:",
            f"# {hint}",
            "# to learn more, visit https://github.com/liquidcarbon/puppy\n",
        ]
    )
    return PlainTextResponse("\n".join(script))


@app.get("/a", response_class=PrettyJSONResponse)
def get_analytics(n: int = 5):
    if n == 0:
        cmd = f"cat {LOGFILE.as_posix()}"
    else:
        cmd = f"tail -{n} {LOGFILE.as_posix()}"
    json_lines = subprocess.run(cmd.split(), capture_output=True).stdout
    content = json.loads(f"[{json_lines.replace(b"\n", b",").decode()}]")
    return content


@app.api_route("/qa", response_class=FileResponse, methods=["GET", "HEAD"])
def query_analytics():
    return LOGFILE.as_posix()


@app.get("/favicon.ico")
async def favicon():
    return {"message": "woof!"}


@app.get("/ping")
async def ping():
    return {"message": "woof!"}


def self_ping():
    self_host = getenv("SPACE_HOST", "http://0.0.0.0:7860")
    with httpx.Client() as client:
        _ = client.get(f"{self_host}/ping")


scheduler = AsyncIOScheduler(executors={"default": AsyncIOExecutor()})
scheduler.add_job(self_ping, "interval", minutes=1)


if __name__ == "__main__":
    import uvicorn

    fmt = "%(asctime)s %(levelprefix)s %(message)s"
    uvicorn_logging = uvicorn.config.LOGGING_CONFIG
    uvicorn_logging["formatters"]["access"]["datefmt"] = "%y%m%d @ %T"
    uvicorn_logging["formatters"]["access"]["fmt"] = fmt
    uvicorn_logging["formatters"]["default"]["datefmt"] = "%y%m%d @ %T"
    uvicorn_logging["formatters"]["default"]["fmt"] = fmt
    uvicorn.run(app, host="0.0.0.0", port=7860)