Spaces:
Sleeping
Sleeping
File size: 7,124 Bytes
4d6e8c2 3b09640 998e8ac 3b09640 4d6e8c2 3b09640 4d6e8c2 05edc46 4d6e8c2 11d5013 1c33274 70f5f26 3b09640 1c33274 70f5f26 4d6e8c2 3b09640 70f5f26 11d5013 3b09640 4d6e8c2 3b09640 4d6e8c2 3b09640 11d5013 1d78f26 8c7ef1f 11d5013 9c91ae6 05edc46 1d78f26 fe348ee 8c7ef1f fe348ee 3b09640 fe348ee 3b09640 fe348ee fc60ded fe348ee 1288dc4 fc60ded fe348ee fc60ded cd8c2b8 fe348ee cd8c2b8 fe348ee 3b09640 998e8ac 3b09640 998e8ac 3b09640 4d6e8c2 3b09640 70f5f26 3b09640 998e8ac 3b09640 4d6e8c2 70f5f26 4d6e8c2 3b09640 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
from fastapi import APIRouter
from datetime import datetime
from datasets import load_dataset
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score
import random
import os
from .utils.evaluation import ImageEvaluationRequest
from .utils.emissions import tracker, clean_emissions_data, get_space_info
from dotenv import load_dotenv
load_dotenv()
# Dependencies for inference
import logging
from pathlib import Path
from ultralytics import YOLO
from torch import device
from torch.cuda import is_available
router = APIRouter()
DESCRIPTION = "Frugal Object Detector for forest fires"
ROUTE = "/image"
def parse_boxes(annotation_string):
"""Parse multiple boxes from a single annotation string.
Each box has 5 values: class_id, x_center, y_center, width, height"""
values = [float(x) for x in annotation_string.strip().split()]
boxes = []
# Each box has 5 values
for i in range(0, len(values), 5):
if i + 5 <= len(values):
# Skip class_id (first value) and take the next 4 values
box = values[i+1:i+5]
boxes.append(box)
return boxes
def compute_iou(box1, box2):
"""Compute Intersection over Union (IoU) between two YOLO format boxes."""
# Convert YOLO format (x_center, y_center, width, height) to corners
def yolo_to_corners(box):
x_center, y_center, width, height = box
x1 = x_center - width/2
y1 = y_center - height/2
x2 = x_center + width/2
y2 = y_center + height/2
return np.array([x1, y1, x2, y2])
box1_corners = yolo_to_corners(box1)
box2_corners = yolo_to_corners(box2)
# Calculate intersection
x1 = max(box1_corners[0], box2_corners[0])
y1 = max(box1_corners[1], box2_corners[1])
x2 = min(box1_corners[2], box2_corners[2])
y2 = min(box1_corners[3], box2_corners[3])
intersection = max(0, x2 - x1) * max(0, y2 - y1)
# Calculate union
box1_area = (box1_corners[2] - box1_corners[0]) * (box1_corners[3] - box1_corners[1])
box2_area = (box2_corners[2] - box2_corners[0]) * (box2_corners[3] - box2_corners[1])
union = box1_area + box2_area - intersection
return intersection / (union + 1e-6)
def compute_max_iou(true_boxes, pred_box):
"""Compute maximum IoU between a predicted box and all true boxes"""
max_iou = 0
for true_box in true_boxes:
iou = compute_iou(true_box, pred_box)
max_iou = max(max_iou, iou)
return max_iou
@router.post(ROUTE, tags=["Image Task"],
description=DESCRIPTION)
async def evaluate_image(request: ImageEvaluationRequest):
"""
Evaluate image classification and object detection for forest fire smoke.
Current Model: Yolo11 nano
Metrics:
- Classification accuracy: Whether an image contains smoke or not
- Object Detection accuracy: IoU (Intersection over Union) for smoke bounding boxes
"""
# Get space info
username, space_url = get_space_info()
# Load and prepare the dataset
dataset = load_dataset(request.dataset_name, token=os.getenv("HF_TOKEN"))
# Split dataset
train_test = dataset["train"].train_test_split(test_size=request.test_size, seed=request.test_seed)
test_dataset = train_test["test"]
# Start tracking emissions
tracker.start()
tracker.start_task("inference")
#--------------------------------------------------------------------------------------------
# YOUR MODEL INFERENCE CODE HERE
# Update the code below to replace the random baseline with your model inference
#--------------------------------------------------------------------------------------------
THRESHOLD = 0.18
IMGSIZE = 1280
# Load model
model_path = Path("tasks", "models")
model_name = "best_gpu_fp16.pt"
logging.info(f"Loading model {model_name}")
model = YOLO(Path(model_path, model_name), task="detect")
device_name = device("cuda" if is_available() else "cpu")
model.to(device_name)
# Preprocessing the annotations before the loop to avoid repeated parsing
annotations = [example.get("annotations", "").strip() for example in test_dataset]
true_labels = [int(len(ann) > 0) for ann in annotations]
# Initialize lists
predictions = []
true_boxes_list = []
pred_boxes = []
logging.info(f"Inference start on device: {device_name}")
for i, example in enumerate(test_dataset):
has_smoke = true_labels[i]
annotation = annotations[i]
# Make prediction
results = model.predict(example["image"], device=device_name, conf=THRESHOLD, verbose=False, imgsz=IMGSIZE)[0]
pred_has_smoke = len(results) > 0
predictions.append(int(pred_has_smoke))
# If there's a true box, parse it and add box prediction
if has_smoke:
# Parse all true boxes from the annotation
image_true_boxes = parse_boxes(annotation)
true_boxes_list.append(image_true_boxes)
# Append bounding box for the prediction
if results.boxes.cls.numel() != 0:
pred_boxes.append(results.boxes[0].xywhn.tolist()[0])
else:
pred_boxes.append([0, 0, 0, 0])
#--------------------------------------------------------------------------------------------
# YOUR MODEL INFERENCE STOPS HERE
#--------------------------------------------------------------------------------------------
# Stop tracking emissions
emissions_data = tracker.stop_task()
# Calculate classification metrics
classification_accuracy = accuracy_score(true_labels, predictions)
classification_precision = precision_score(true_labels, predictions)
classification_recall = recall_score(true_labels, predictions)
# Calculate mean IoU for object detection (only for images with smoke)
# For each image, we compute the max IoU between the predicted box and all true boxes
ious = []
for true_boxes, pred_box in zip(true_boxes_list, pred_boxes):
max_iou = compute_max_iou(true_boxes, pred_box)
ious.append(max_iou)
mean_iou = float(np.mean(ious)) if ious else 0.0
# Prepare results dictionary
results = {
"username": username,
"space_url": space_url,
"submission_timestamp": datetime.now().isoformat(),
"model_description": DESCRIPTION,
"classification_accuracy": float(classification_accuracy),
"classification_precision": float(classification_precision),
"classification_recall": float(classification_recall),
"mean_iou": mean_iou,
"energy_consumed_wh": emissions_data.energy_consumed * 1000,
"emissions_gco2eq": emissions_data.emissions * 1000,
"emissions_data": clean_emissions_data(emissions_data),
"api_route": ROUTE,
"dataset_config": {
"dataset_name": request.dataset_name,
"test_size": request.test_size,
"test_seed": request.test_seed
}
}
return results |