Spaces:
Runtime error
Runtime error
File size: 1,825 Bytes
2eafbc4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
from apscheduler.schedulers.background import BackgroundScheduler
from fastapi import FastAPI
from inference.core.env import METRICS_INTERVAL
from inference.core.version import __version__
from inference.enterprise.device_manager.command_handler import (
Command,
fetch_commands,
handle_command,
)
from inference.enterprise.device_manager.metrics_service import (
report_metrics_and_handle_commands,
)
app = FastAPI(
title="Roboflow Device Manager",
description="The device manager enables remote control and monitoring of Roboflow inference server containers",
version=__version__,
terms_of_service="https://roboflow.com/terms",
contact={
"name": "Roboflow Inc.",
"url": "https://roboflow.com/contact",
"email": "[email protected]",
},
license_info={
"name": "Apache 2.0",
"url": "https://www.apache.org/licenses/LICENSE-2.0.html",
},
root_path="/",
)
@app.get("/")
def root():
return {
"name": "Roboflow Device Manager",
"version": __version__,
"terms_of_service": "https://roboflow.com/terms",
"contact": {
"name": "Roboflow Inc.",
"url": "https://roboflow.com/contact",
"email": "[email protected]",
},
"license_info": {
"name": "Apache 2.0",
"url": "https://www.apache.org/licenses/LICENSE-2.0.html",
},
}
@app.post("/exec_command")
async def exec_command(command: Command):
handle_command(command.dict())
return {"status": "ok"}
scheduler = BackgroundScheduler(job_defaults={"coalesce": True, "max_instances": 3})
scheduler.add_job(
report_metrics_and_handle_commands, "interval", seconds=METRICS_INTERVAL
)
scheduler.add_job(fetch_commands, "interval", seconds=3)
scheduler.start()
|