Spaces:
Runtime error
Runtime error
File size: 9,478 Bytes
2eafbc4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 |
import base64
import time
from dataclasses import dataclass
from datetime import datetime
import requests
import docker
from inference.core.cache import cache
from inference.core.env import METRICS_INTERVAL
from inference.core.logger import logger
from inference.core.utils.image_utils import load_image_rgb
from inference.enterprise.device_manager.helpers import get_cache_model_items
@dataclass
class InferServerContainer:
status: str
id: str
port: int
host: str
startup_time: float
version: str
def __init__(self, docker_container, details):
self.container = docker_container
self.status = details.get("status")
self.id = details.get("uuid")
self.port = details.get("port")
self.host = details.get("host")
self.version = details.get("version")
t = details.get("startup_time_ts").split(".")[0]
self.startup_time = (
datetime.strptime(t, "%Y-%m-%dT%H:%M:%S").timestamp()
if t is not None
else datetime.now().timestamp()
)
def kill(self):
try:
self.container.kill()
return True, None
except Exception as e:
logger.error(e)
return False, None
def restart(self):
try:
self.container.restart()
return True, None
except Exception as e:
logger.error(e)
return False, None
def stop(self):
try:
self.container.stop()
return True, None
except Exception as e:
logger.error(e)
return False, None
def start(self):
try:
self.container.start()
return True, None
except Exception as e:
logger.error(e)
return False, None
def inspect(self):
try:
info = requests.get(f"http://{self.host}:{self.port}/info").json()
return True, info
except Exception as e:
logger.error(e)
return False, None
def snapshot(self):
try:
snapshot = self.get_latest_inferred_images()
snapshot.update({"container_id": self.id})
return True, snapshot
except Exception as e:
logger.error(e)
return False, None
def get_latest_inferred_images(self, max=4):
"""
Retrieve the latest inferred images and associated information for this container.
This method fetches the most recent inferred images within the time interval defined by METRICS_INTERVAL.
Args:
max (int, optional): The maximum number of inferred images to retrieve.
Defaults to 4.
Returns:
dict: A dictionary where each key represents a model ID associated with this
container, and the corresponding value is a list of dictionaries containing
information about the latest inferred images. Each dictionary has the following keys:
- "image" (str): The base64-encoded image data.
- "dimensions" (dict): Image dimensions (width and height).
- "predictions" (list): A list of predictions or results associated with the image.
Notes:
- This method uses the global constant METRICS_INTERVAL to specify the time interval.
"""
now = time.time()
start = now - METRICS_INTERVAL
api_keys = get_cache_model_items().get(self.id, dict()).keys()
model_ids = []
for api_key in api_keys:
mids = get_cache_model_items().get(self.id, dict()).get(api_key, [])
model_ids.extend(mids)
num_images = 0
latest_inferred_images = dict()
for model_id in model_ids:
if num_images >= max:
break
latest_reqs = cache.zrangebyscore(
f"inference:{self.id}:{model_id}", min=start, max=now
)
for req in latest_reqs:
images = req["request"]["image"]
image_dims = req.get("response", {}).get("image", dict())
predictions = req.get("response", {}).get("predictions", [])
if images is None or len(images) == 0:
continue
if type(images) is not list:
images = [images]
for image in images:
value = None
if image["type"] == "base64":
value = image["value"]
else:
loaded_image = load_image_rgb(image)
image_bytes = loaded_image.tobytes()
image_base64 = base64.b64encode(image_bytes).decode("utf-8")
value = image_base64
if latest_inferred_images.get(model_id) is None:
latest_inferred_images[model_id] = []
inference = dict(
image=value, dimensions=image_dims, predictions=predictions
)
latest_inferred_images[model_id].append(inference)
num_images += 1
return latest_inferred_images
def get_startup_config(self):
"""
Get the startup configuration for this container.
Returns:
dict: A dictionary containing the startup configuration for this container.
"""
env_vars = self.container.attrs.get("Config", {}).get("Env", {})
port_bindings = self.container.attrs.get("HostConfig", {}).get(
"PortBindings", {}
)
detached = self.container.attrs.get("HostConfig", {}).get("Detached", False)
image = self.container.attrs.get("Config", {}).get("Image", "")
privileged = self.container.attrs.get("HostConfig", {}).get("Privileged", False)
labels = self.container.attrs.get("Config", {}).get("Labels", {})
env = []
for var in env_vars:
name, value = var.split("=")
env.append(f"{name}={value}")
return {
"env": env,
"port_bindings": port_bindings,
"detach": detached,
"image": image,
"privileged": privileged,
"labels": labels,
# TODO: add device requests
}
def is_inference_server_container(container):
"""
Checks if a container is an inference server container
Args:
container (any): A container object from the Docker SDK
Returns:
boolean: True if the container is an inference server container, False otherwise
"""
image_tags = container.image.tags
for t in image_tags:
if t.startswith("roboflow/roboflow-inference-server"):
return True
return False
def get_inference_containers():
"""
Discovers inference server containers running on the host
and parses their information into a list of InferServerContainer objects
"""
client = docker.from_env()
containers = client.containers.list()
inference_containers = []
for c in containers:
if is_inference_server_container(c):
details = parse_container_info(c)
info = {}
try:
info = requests.get(
f"http://{details['host']}:{details['port']}/info", timeout=3
).json()
except Exception as e:
logger.error(f"Failed to get info from container {c.id} {details} {e}")
details.update(info)
infer_container = InferServerContainer(c, details)
if len(inference_containers) == 0:
inference_containers.append(infer_container)
continue
for ic in inference_containers:
if ic.id == infer_container.id:
continue
inference_containers.append(infer_container)
return inference_containers
def parse_container_info(c):
"""
Parses the container information into a dictionary
Args:
c (any): Docker SDK Container object
Returns:
dict: A dictionary containing the container information
"""
env = c.attrs.get("Config", {}).get("Env", {})
info = {"container_id": c.id, "port": 9001, "host": "0.0.0.0"}
for var in env:
if var.startswith("PORT="):
info["port"] = var.split("=")[1]
elif var.startswith("HOST="):
info["host"] = var.split("=")[1]
status = c.attrs.get("State", {}).get("Status")
if status:
info["status"] = status
container_name = c.attrs.get("Name")
if container_name:
info["container_name_on_host"] = container_name
startup_time = c.attrs.get("State", {}).get("StartedAt")
if startup_time:
info["startup_time_ts"] = startup_time
return info
def get_container_by_id(id):
"""
Gets an inference server container by its id
Args:
id (string): The id of the container
Returns:
container: The container object if found, None otherwise
"""
containers = get_inference_containers()
for c in containers:
if c.id == id:
return c
return None
def get_container_ids():
"""
Gets the ids of the inference server containers
Returns:
list: A list of container ids
"""
containers = get_inference_containers()
return [c.id for c in containers]
|