Spaces:
Runtime error
Runtime error
import logging | |
import tempfile | |
import os | |
import torch | |
import numpy as np | |
import json | |
from collections import OrderedDict | |
from tqdm import tqdm | |
from maskrcnn_benchmark.modeling.roi_heads.mask_head.inference import Masker | |
from maskrcnn_benchmark.structures.bounding_box import BoxList | |
from maskrcnn_benchmark.structures.boxlist_ops import boxlist_iou | |
def do_coco_evaluation( | |
dataset, | |
predictions, | |
box_only, | |
output_folder, | |
iou_types, | |
expected_results, | |
expected_results_sigma_tol, | |
): | |
logger = logging.getLogger("maskrcnn_benchmark.inference") | |
if box_only: | |
logger.info("Evaluating bbox proposals") | |
if dataset.coco is None and output_folder: | |
json_results = prepare_for_tsv_detection(predictions, dataset) | |
with open(os.path.join(output_folder, "box_proposals.json"), "w") as f: | |
json.dump(json_results, f) | |
return None | |
areas = {"all": "", "small": "s", "medium": "m", "large": "l"} | |
res = COCOResults("box_proposal") | |
for limit in [100, 1000]: | |
for area, suffix in areas.items(): | |
stats = evaluate_box_proposals( | |
predictions, dataset, area=area, limit=limit | |
) | |
key = "AR{}@{:d}".format(suffix, limit) | |
res.results["box_proposal"][key] = stats["ar"].item() | |
logger.info(res) | |
check_expected_results(res, expected_results, expected_results_sigma_tol) | |
if output_folder: | |
torch.save(res, os.path.join(output_folder, "box_proposals.pth")) | |
return res, predictions | |
logger.info("Preparing results for COCO format") | |
coco_results = {} | |
if "bbox" in iou_types: | |
logger.info("Preparing bbox results") | |
if dataset.coco is None: | |
coco_results["bbox"] = prepare_for_tsv_detection(predictions, dataset) | |
else: | |
coco_results["bbox"] = prepare_for_coco_detection(predictions, dataset) | |
if "segm" in iou_types: | |
logger.info("Preparing segm results") | |
coco_results["segm"] = prepare_for_coco_segmentation(predictions, dataset) | |
if 'keypoints' in iou_types: | |
logger.info('Preparing keypoints results') | |
coco_results['keypoints'] = prepare_for_coco_keypoint(predictions, dataset) | |
results = COCOResults(*iou_types) | |
logger.info("Evaluating predictions") | |
for iou_type in iou_types: | |
with tempfile.NamedTemporaryFile() as f: | |
file_path = f.name | |
if output_folder: | |
file_path = os.path.join(output_folder, iou_type + ".json") | |
if dataset.coco: | |
res = evaluate_predictions_on_coco( | |
dataset.coco, coco_results[iou_type], file_path, iou_type | |
) | |
results.update(res) | |
elif output_folder: | |
with open(file_path, "w") as f: | |
json.dump(coco_results[iou_type], f) | |
logger.info(results) | |
check_expected_results(results, expected_results, expected_results_sigma_tol) | |
if output_folder: | |
torch.save(results, os.path.join(output_folder, "coco_results.pth")) | |
return results, coco_results | |
def prepare_for_tsv_detection(predictions, dataset): | |
# assert isinstance(dataset, COCODataset) | |
proposal_results = [] | |
image_list = [] | |
for im_id, prediction in enumerate(predictions): | |
image_info = dataset.get_img_info(im_id) | |
if len(prediction) == 0: | |
continue | |
# TODO replace with get_img_info? | |
image_id = image_info["id"] | |
image_width = image_info["width"] | |
image_height = image_info["height"] | |
prediction = prediction.resize((image_width, image_height)) | |
prediction = prediction.convert("xywh") | |
boxes = prediction.bbox.tolist() | |
scores = prediction.get_field("scores").tolist() | |
labels = prediction.get_field("labels").tolist() | |
if prediction.has_field("centers"): | |
centers = prediction.get_field("centers") | |
else: | |
centers = None | |
for k, box in enumerate(boxes): | |
proposal = { | |
"image_id": image_id, | |
"category_id": labels[k], | |
"bbox": box, | |
"score": scores[k], | |
"area": image_width * image_height, | |
"iscrowd": 0, | |
} | |
if centers is not None: | |
proposal.update(center=centers[k].tolist()) | |
proposal_results.append(proposal) | |
image_list.append(image_info) | |
# categories = [{'supercategory': 'proposal', 'id': 0, 'name': 'proposal'}] | |
return dict(images=image_list, annotations=proposal_results) | |
def prepare_for_coco_detection(predictions, dataset): | |
# assert isinstance(dataset, COCODataset) | |
coco_results = [] | |
for image_id, prediction in enumerate(predictions): | |
original_id = dataset.id_to_img_map[image_id] | |
if len(prediction) == 0: | |
continue | |
# TODO replace with get_img_info? | |
image_width = dataset.coco.imgs[original_id]["width"] | |
image_height = dataset.coco.imgs[original_id]["height"] | |
prediction = prediction.resize((image_width, image_height)) | |
prediction = prediction.convert("xywh") | |
boxes = prediction.bbox.tolist() | |
scores = prediction.get_field("scores").tolist() | |
labels = prediction.get_field("labels").tolist() | |
for k, box in enumerate(boxes): | |
if labels[k] in dataset.contiguous_category_id_to_json_id: | |
coco_results.append( | |
{ | |
"image_id": original_id, | |
"category_id": dataset.contiguous_category_id_to_json_id[labels[k]], | |
"bbox": box, | |
"score": scores[k], | |
}) | |
return coco_results | |
def prepare_for_coco_segmentation(predictions, dataset): | |
import pycocotools.mask as mask_util | |
import numpy as np | |
masker = Masker(threshold=0.5, padding=1) | |
# assert isinstance(dataset, COCODataset) | |
coco_results = [] | |
for image_id, prediction in tqdm(enumerate(predictions)): | |
original_id = dataset.id_to_img_map[image_id] | |
if len(prediction) == 0: | |
continue | |
# TODO replace with get_img_info? | |
image_width = dataset.coco.imgs[original_id]["width"] | |
image_height = dataset.coco.imgs[original_id]["height"] | |
prediction = prediction.resize((image_width, image_height)) | |
masks = prediction.get_field("mask") | |
# t = time.time() | |
# Masker is necessary only if masks haven't been already resized. | |
if list(masks.shape[-2:]) != [image_height, image_width]: | |
masks = masker(masks.expand(1, -1, -1, -1, -1), prediction) | |
masks = masks[0] | |
# logger.info('Time mask: {}'.format(time.time() - t)) | |
# prediction = prediction.convert('xywh') | |
# boxes = prediction.bbox.tolist() | |
scores = prediction.get_field("scores").tolist() | |
labels = prediction.get_field("labels").tolist() | |
# rles = prediction.get_field('mask') | |
rles = [ | |
mask_util.encode(np.array(mask[0, :, :, np.newaxis], order="F"))[0] | |
for mask in masks | |
] | |
for rle in rles: | |
rle["counts"] = rle["counts"].decode("utf-8") | |
mapped_labels = [dataset.contiguous_category_id_to_json_id[i] for i in labels] | |
coco_results.extend( | |
[ | |
{ | |
"image_id": original_id, | |
"category_id": mapped_labels[k], | |
"segmentation": rle, | |
"score": scores[k], | |
} | |
for k, rle in enumerate(rles) | |
] | |
) | |
return coco_results | |
def prepare_for_coco_keypoint(predictions, dataset): | |
# assert isinstance(dataset, COCODataset) | |
coco_results = [] | |
for image_id, prediction in enumerate(predictions): | |
original_id = dataset.id_to_img_map[image_id] | |
if len(prediction.bbox) == 0: | |
continue | |
# TODO replace with get_img_info? | |
image_width = dataset.coco.imgs[original_id]['width'] | |
image_height = dataset.coco.imgs[original_id]['height'] | |
prediction = prediction.resize((image_width, image_height)) | |
prediction = prediction.convert('xywh') | |
boxes = prediction.bbox.tolist() | |
scores = prediction.get_field('scores').tolist() | |
labels = prediction.get_field('labels').tolist() | |
keypoints = prediction.get_field('keypoints') | |
keypoints = keypoints.resize((image_width, image_height)) | |
keypoints = keypoints.to_coco_format() | |
mapped_labels = [dataset.contiguous_category_id_to_json_id[i] for i in labels] | |
coco_results.extend([{ | |
'image_id': original_id, | |
'category_id': mapped_labels[k], | |
'keypoints': keypoint, | |
'score': scores[k]} for k, keypoint in enumerate(keypoints)]) | |
return coco_results | |
# inspired from Detectron | |
def evaluate_box_proposals( | |
predictions, dataset, thresholds=None, area="all", limit=None | |
): | |
"""Evaluate detection proposal recall metrics. This function is a much | |
faster alternative to the official COCO API recall evaluation code. However, | |
it produces slightly different results. | |
""" | |
# Record max overlap value for each gt box | |
# Return vector of overlap values | |
areas = { | |
"all": 0, | |
"small": 1, | |
"medium": 2, | |
"large": 3, | |
"96-128": 4, | |
"128-256": 5, | |
"256-512": 6, | |
"512-inf": 7, | |
} | |
area_ranges = [ | |
[0 ** 2, 1e5 ** 2], # all | |
[0 ** 2, 32 ** 2], # small | |
[32 ** 2, 96 ** 2], # medium | |
[96 ** 2, 1e5 ** 2], # large | |
[96 ** 2, 128 ** 2], # 96-128 | |
[128 ** 2, 256 ** 2], # 128-256 | |
[256 ** 2, 512 ** 2], # 256-512 | |
[512 ** 2, 1e5 ** 2], | |
] # 512-inf | |
assert area in areas, "Unknown area range: {}".format(area) | |
area_range = area_ranges[areas[area]] | |
gt_overlaps = [] | |
num_pos = 0 | |
for image_id, prediction in enumerate(predictions): | |
original_id = dataset.id_to_img_map[image_id] | |
# TODO replace with get_img_info? | |
image_width = dataset.coco.imgs[original_id]["width"] | |
image_height = dataset.coco.imgs[original_id]["height"] | |
prediction = prediction.resize((image_width, image_height)) | |
# sort predictions in descending order | |
# TODO maybe remove this and make it explicit in the documentation | |
if prediction.has_field("objectness"): | |
inds = prediction.get_field("objectness").sort(descending=True)[1] | |
else: | |
inds = prediction.get_field("scores").sort(descending=True)[1] | |
prediction = prediction[inds] | |
ann_ids = dataset.coco.getAnnIds(imgIds=original_id) | |
anno = dataset.coco.loadAnns(ann_ids) | |
gt_boxes = [obj["bbox"] for obj in anno if obj["iscrowd"] == 0] | |
gt_boxes = torch.as_tensor(gt_boxes).reshape(-1, 4) # guard against no boxes | |
gt_boxes = BoxList(gt_boxes, (image_width, image_height), mode="xywh").convert( | |
"xyxy" | |
) | |
gt_areas = torch.as_tensor([obj["area"] for obj in anno if obj["iscrowd"] == 0]) | |
if len(gt_boxes) == 0: | |
continue | |
valid_gt_inds = (gt_areas >= area_range[0]) & (gt_areas <= area_range[1]) | |
gt_boxes = gt_boxes[valid_gt_inds] | |
num_pos += len(gt_boxes) | |
if len(gt_boxes) == 0: | |
continue | |
if len(prediction) == 0: | |
continue | |
if limit is not None and len(prediction) > limit: | |
prediction = prediction[:limit] | |
overlaps = boxlist_iou(prediction, gt_boxes) | |
_gt_overlaps = torch.zeros(len(gt_boxes)) | |
for j in range(min(len(prediction), len(gt_boxes))): | |
# find which proposal box maximally covers each gt box | |
# and get the iou amount of coverage for each gt box | |
max_overlaps, argmax_overlaps = overlaps.max(dim=0) | |
# find which gt box is 'best' covered (i.e. 'best' = most iou) | |
gt_ovr, gt_ind = max_overlaps.max(dim=0) | |
assert gt_ovr >= 0 | |
# find the proposal box that covers the best covered gt box | |
box_ind = argmax_overlaps[gt_ind] | |
# record the iou coverage of this gt box | |
_gt_overlaps[j] = overlaps[box_ind, gt_ind] | |
assert _gt_overlaps[j] == gt_ovr | |
# mark the proposal box and the gt box as used | |
overlaps[box_ind, :] = -1 | |
overlaps[:, gt_ind] = -1 | |
# append recorded iou coverage level | |
gt_overlaps.append(_gt_overlaps) | |
if len(gt_overlaps) == 0: | |
return { | |
"ar": torch.zeros(1), | |
"recalls": torch.zeros(1), | |
"thresholds": thresholds, | |
"gt_overlaps": gt_overlaps, | |
"num_pos": num_pos, | |
} | |
gt_overlaps = torch.cat(gt_overlaps, dim=0) | |
gt_overlaps, _ = torch.sort(gt_overlaps) | |
if thresholds is None: | |
step = 0.05 | |
thresholds = torch.arange(0.5, 0.95 + 1e-5, step, dtype=torch.float32) | |
recalls = torch.zeros_like(thresholds) | |
# compute recall for each iou threshold | |
for i, t in enumerate(thresholds): | |
recalls[i] = (gt_overlaps >= t).float().sum() / float(num_pos) | |
# ar = 2 * np.trapz(recalls, thresholds) | |
ar = recalls.mean() | |
return { | |
"ar": ar, | |
"recalls": recalls, | |
"thresholds": thresholds, | |
"gt_overlaps": gt_overlaps, | |
"num_pos": num_pos, | |
} | |
def evaluate_predictions_on_coco( | |
coco_gt, coco_results, json_result_file, iou_type="bbox" | |
): | |
import json | |
with open(json_result_file, "w") as f: | |
json.dump(coco_results, f) | |
from pycocotools.coco import COCO | |
from pycocotools.cocoeval import COCOeval | |
coco_dt = coco_gt.loadRes(str(json_result_file)) if coco_results else COCO() | |
# coco_dt = coco_gt.loadRes(coco_results) | |
if iou_type == 'keypoints': | |
coco_gt = filter_valid_keypoints(coco_gt, coco_dt) | |
coco_eval = COCOeval(coco_gt, coco_dt, iou_type) | |
coco_eval.evaluate() | |
coco_eval.accumulate() | |
coco_eval.summarize() | |
if iou_type == 'bbox': | |
summarize_per_category(coco_eval, json_result_file.replace('.json', '.csv')) | |
return coco_eval | |
def summarize_per_category(coco_eval, csv_output=None): | |
''' | |
Compute and display summary metrics for evaluation results. | |
Note this functin can *only* be applied on the default parameter setting | |
''' | |
def _summarize(iouThr=None, areaRng='all', maxDets=100): | |
p = coco_eval.params | |
titleStr = 'Average Precision' | |
typeStr = '(AP)' | |
iouStr = '{:0.2f}:{:0.2f}'.format(p.iouThrs[0], p.iouThrs[-1]) \ | |
if iouThr is None else '{:0.2f}'.format(iouThr) | |
result_str = ' {:<18} {} @[ IoU={:<9} | area={:>6s} | maxDets={:>3d} ], '. \ | |
format(titleStr, typeStr, iouStr, areaRng, maxDets) | |
aind = [i for i, aRng in enumerate(p.areaRngLbl) if aRng == areaRng] | |
mind = [i for i, mDet in enumerate(p.maxDets) if mDet == maxDets] | |
# dimension of precision: [TxRxKxAxM] | |
s = coco_eval.eval['precision'] | |
# IoU | |
if iouThr is not None: | |
t = np.where(iouThr == p.iouThrs)[0] | |
s = s[t] | |
s = s[:, :, :, aind, mind] | |
if len(s[s > -1]) == 0: | |
mean_s = -1 | |
else: | |
mean_s = np.mean(s[s > -1]) | |
# cacluate AP(average precision) for each category | |
num_classes = len(p.catIds) | |
avg_ap = 0.0 | |
for i in range(0, num_classes): | |
result_str += '{}, '.format(np.mean(s[:, :, i, :])) | |
avg_ap += np.mean(s[:, :, i, :]) | |
result_str += ('{} \n'.format(avg_ap / num_classes)) | |
return result_str | |
id2name = {} | |
for _, cat in coco_eval.cocoGt.cats.items(): | |
id2name[cat['id']] = cat['name'] | |
title_str = 'metric, ' | |
for cid in coco_eval.params.catIds: | |
title_str += '{}, '.format(id2name[cid]) | |
title_str += 'avg \n' | |
results = [title_str] | |
results.append(_summarize()) | |
results.append(_summarize(iouThr=.5, maxDets=coco_eval.params.maxDets[2])) | |
results.append(_summarize(areaRng='small', maxDets=coco_eval.params.maxDets[2])) | |
results.append(_summarize(areaRng='medium', maxDets=coco_eval.params.maxDets[2])) | |
results.append(_summarize(areaRng='large', maxDets=coco_eval.params.maxDets[2])) | |
with open(csv_output, 'w') as f: | |
for result in results: | |
f.writelines(result) | |
def filter_valid_keypoints(coco_gt, coco_dt): | |
kps = coco_dt.anns[1]['keypoints'] | |
for id, ann in coco_gt.anns.items(): | |
ann['keypoints'][2::3] = [a * b for a, b in zip(ann['keypoints'][2::3], kps[2::3])] | |
ann['num_keypoints'] = sum(ann['keypoints'][2::3]) | |
return coco_gt | |
class COCOResults(object): | |
METRICS = { | |
"bbox": ["AP", "AP50", "AP75", "APs", "APm", "APl"], | |
"segm": ["AP", "AP50", "AP75", "APs", "APm", "APl"], | |
"box_proposal": [ | |
"AR@100", | |
"ARs@100", | |
"ARm@100", | |
"ARl@100", | |
"AR@1000", | |
"ARs@1000", | |
"ARm@1000", | |
"ARl@1000", | |
], | |
"keypoints": ["AP", "AP50", "AP75", "APm", "APl"], | |
} | |
def __init__(self, *iou_types): | |
allowed_types = ("box_proposal", "bbox", "segm", "keypoints") | |
assert all(iou_type in allowed_types for iou_type in iou_types) | |
results = OrderedDict() | |
for iou_type in iou_types: | |
results[iou_type] = OrderedDict( | |
[(metric, -1) for metric in COCOResults.METRICS[iou_type]] | |
) | |
self.results = results | |
def update(self, coco_eval): | |
if coco_eval is None: | |
return | |
from pycocotools.cocoeval import COCOeval | |
assert isinstance(coco_eval, COCOeval) | |
s = coco_eval.stats | |
iou_type = coco_eval.params.iouType | |
res = self.results[iou_type] | |
metrics = COCOResults.METRICS[iou_type] | |
for idx, metric in enumerate(metrics): | |
res[metric] = s[idx] | |
def __repr__(self): | |
# TODO make it pretty | |
return repr(self.results) | |
def check_expected_results(results, expected_results, sigma_tol): | |
if not expected_results: | |
return | |
logger = logging.getLogger("maskrcnn_benchmark.inference") | |
for task, metric, (mean, std) in expected_results: | |
actual_val = results.results[task][metric] | |
lo = mean - sigma_tol * std | |
hi = mean + sigma_tol * std | |
ok = (lo < actual_val) and (actual_val < hi) | |
msg = ( | |
"{} > {} sanity check (actual vs. expected): " | |
"{:.3f} vs. mean={:.4f}, std={:.4}, range=({:.4f}, {:.4f})" | |
).format(task, metric, actual_val, mean, std, lo, hi) | |
if not ok: | |
msg = "FAIL: " + msg | |
logger.error(msg) | |
else: | |
msg = "PASS: " + msg | |
logger.info(msg) | |