Pinwheel's picture
HF Demo
128757a
import logging
import tempfile
import os
import torch
import numpy as np
import json
from collections import OrderedDict
from tqdm import tqdm
from maskrcnn_benchmark.modeling.roi_heads.mask_head.inference import Masker
from maskrcnn_benchmark.structures.bounding_box import BoxList
from maskrcnn_benchmark.structures.boxlist_ops import boxlist_iou
def do_coco_evaluation(
dataset,
predictions,
box_only,
output_folder,
iou_types,
expected_results,
expected_results_sigma_tol,
):
logger = logging.getLogger("maskrcnn_benchmark.inference")
if box_only:
logger.info("Evaluating bbox proposals")
if dataset.coco is None and output_folder:
json_results = prepare_for_tsv_detection(predictions, dataset)
with open(os.path.join(output_folder, "box_proposals.json"), "w") as f:
json.dump(json_results, f)
return None
areas = {"all": "", "small": "s", "medium": "m", "large": "l"}
res = COCOResults("box_proposal")
for limit in [100, 1000]:
for area, suffix in areas.items():
stats = evaluate_box_proposals(
predictions, dataset, area=area, limit=limit
)
key = "AR{}@{:d}".format(suffix, limit)
res.results["box_proposal"][key] = stats["ar"].item()
logger.info(res)
check_expected_results(res, expected_results, expected_results_sigma_tol)
if output_folder:
torch.save(res, os.path.join(output_folder, "box_proposals.pth"))
return res, predictions
logger.info("Preparing results for COCO format")
coco_results = {}
if "bbox" in iou_types:
logger.info("Preparing bbox results")
if dataset.coco is None:
coco_results["bbox"] = prepare_for_tsv_detection(predictions, dataset)
else:
coco_results["bbox"] = prepare_for_coco_detection(predictions, dataset)
if "segm" in iou_types:
logger.info("Preparing segm results")
coco_results["segm"] = prepare_for_coco_segmentation(predictions, dataset)
if 'keypoints' in iou_types:
logger.info('Preparing keypoints results')
coco_results['keypoints'] = prepare_for_coco_keypoint(predictions, dataset)
results = COCOResults(*iou_types)
logger.info("Evaluating predictions")
for iou_type in iou_types:
with tempfile.NamedTemporaryFile() as f:
file_path = f.name
if output_folder:
file_path = os.path.join(output_folder, iou_type + ".json")
if dataset.coco:
res = evaluate_predictions_on_coco(
dataset.coco, coco_results[iou_type], file_path, iou_type
)
results.update(res)
elif output_folder:
with open(file_path, "w") as f:
json.dump(coco_results[iou_type], f)
logger.info(results)
check_expected_results(results, expected_results, expected_results_sigma_tol)
if output_folder:
torch.save(results, os.path.join(output_folder, "coco_results.pth"))
return results, coco_results
def prepare_for_tsv_detection(predictions, dataset):
# assert isinstance(dataset, COCODataset)
proposal_results = []
image_list = []
for im_id, prediction in enumerate(predictions):
image_info = dataset.get_img_info(im_id)
if len(prediction) == 0:
continue
# TODO replace with get_img_info?
image_id = image_info["id"]
image_width = image_info["width"]
image_height = image_info["height"]
prediction = prediction.resize((image_width, image_height))
prediction = prediction.convert("xywh")
boxes = prediction.bbox.tolist()
scores = prediction.get_field("scores").tolist()
labels = prediction.get_field("labels").tolist()
if prediction.has_field("centers"):
centers = prediction.get_field("centers")
else:
centers = None
for k, box in enumerate(boxes):
proposal = {
"image_id": image_id,
"category_id": labels[k],
"bbox": box,
"score": scores[k],
"area": image_width * image_height,
"iscrowd": 0,
}
if centers is not None:
proposal.update(center=centers[k].tolist())
proposal_results.append(proposal)
image_list.append(image_info)
# categories = [{'supercategory': 'proposal', 'id': 0, 'name': 'proposal'}]
return dict(images=image_list, annotations=proposal_results)
def prepare_for_coco_detection(predictions, dataset):
# assert isinstance(dataset, COCODataset)
coco_results = []
for image_id, prediction in enumerate(predictions):
original_id = dataset.id_to_img_map[image_id]
if len(prediction) == 0:
continue
# TODO replace with get_img_info?
image_width = dataset.coco.imgs[original_id]["width"]
image_height = dataset.coco.imgs[original_id]["height"]
prediction = prediction.resize((image_width, image_height))
prediction = prediction.convert("xywh")
boxes = prediction.bbox.tolist()
scores = prediction.get_field("scores").tolist()
labels = prediction.get_field("labels").tolist()
for k, box in enumerate(boxes):
if labels[k] in dataset.contiguous_category_id_to_json_id:
coco_results.append(
{
"image_id": original_id,
"category_id": dataset.contiguous_category_id_to_json_id[labels[k]],
"bbox": box,
"score": scores[k],
})
return coco_results
def prepare_for_coco_segmentation(predictions, dataset):
import pycocotools.mask as mask_util
import numpy as np
masker = Masker(threshold=0.5, padding=1)
# assert isinstance(dataset, COCODataset)
coco_results = []
for image_id, prediction in tqdm(enumerate(predictions)):
original_id = dataset.id_to_img_map[image_id]
if len(prediction) == 0:
continue
# TODO replace with get_img_info?
image_width = dataset.coco.imgs[original_id]["width"]
image_height = dataset.coco.imgs[original_id]["height"]
prediction = prediction.resize((image_width, image_height))
masks = prediction.get_field("mask")
# t = time.time()
# Masker is necessary only if masks haven't been already resized.
if list(masks.shape[-2:]) != [image_height, image_width]:
masks = masker(masks.expand(1, -1, -1, -1, -1), prediction)
masks = masks[0]
# logger.info('Time mask: {}'.format(time.time() - t))
# prediction = prediction.convert('xywh')
# boxes = prediction.bbox.tolist()
scores = prediction.get_field("scores").tolist()
labels = prediction.get_field("labels").tolist()
# rles = prediction.get_field('mask')
rles = [
mask_util.encode(np.array(mask[0, :, :, np.newaxis], order="F"))[0]
for mask in masks
]
for rle in rles:
rle["counts"] = rle["counts"].decode("utf-8")
mapped_labels = [dataset.contiguous_category_id_to_json_id[i] for i in labels]
coco_results.extend(
[
{
"image_id": original_id,
"category_id": mapped_labels[k],
"segmentation": rle,
"score": scores[k],
}
for k, rle in enumerate(rles)
]
)
return coco_results
def prepare_for_coco_keypoint(predictions, dataset):
# assert isinstance(dataset, COCODataset)
coco_results = []
for image_id, prediction in enumerate(predictions):
original_id = dataset.id_to_img_map[image_id]
if len(prediction.bbox) == 0:
continue
# TODO replace with get_img_info?
image_width = dataset.coco.imgs[original_id]['width']
image_height = dataset.coco.imgs[original_id]['height']
prediction = prediction.resize((image_width, image_height))
prediction = prediction.convert('xywh')
boxes = prediction.bbox.tolist()
scores = prediction.get_field('scores').tolist()
labels = prediction.get_field('labels').tolist()
keypoints = prediction.get_field('keypoints')
keypoints = keypoints.resize((image_width, image_height))
keypoints = keypoints.to_coco_format()
mapped_labels = [dataset.contiguous_category_id_to_json_id[i] for i in labels]
coco_results.extend([{
'image_id': original_id,
'category_id': mapped_labels[k],
'keypoints': keypoint,
'score': scores[k]} for k, keypoint in enumerate(keypoints)])
return coco_results
# inspired from Detectron
def evaluate_box_proposals(
predictions, dataset, thresholds=None, area="all", limit=None
):
"""Evaluate detection proposal recall metrics. This function is a much
faster alternative to the official COCO API recall evaluation code. However,
it produces slightly different results.
"""
# Record max overlap value for each gt box
# Return vector of overlap values
areas = {
"all": 0,
"small": 1,
"medium": 2,
"large": 3,
"96-128": 4,
"128-256": 5,
"256-512": 6,
"512-inf": 7,
}
area_ranges = [
[0 ** 2, 1e5 ** 2], # all
[0 ** 2, 32 ** 2], # small
[32 ** 2, 96 ** 2], # medium
[96 ** 2, 1e5 ** 2], # large
[96 ** 2, 128 ** 2], # 96-128
[128 ** 2, 256 ** 2], # 128-256
[256 ** 2, 512 ** 2], # 256-512
[512 ** 2, 1e5 ** 2],
] # 512-inf
assert area in areas, "Unknown area range: {}".format(area)
area_range = area_ranges[areas[area]]
gt_overlaps = []
num_pos = 0
for image_id, prediction in enumerate(predictions):
original_id = dataset.id_to_img_map[image_id]
# TODO replace with get_img_info?
image_width = dataset.coco.imgs[original_id]["width"]
image_height = dataset.coco.imgs[original_id]["height"]
prediction = prediction.resize((image_width, image_height))
# sort predictions in descending order
# TODO maybe remove this and make it explicit in the documentation
if prediction.has_field("objectness"):
inds = prediction.get_field("objectness").sort(descending=True)[1]
else:
inds = prediction.get_field("scores").sort(descending=True)[1]
prediction = prediction[inds]
ann_ids = dataset.coco.getAnnIds(imgIds=original_id)
anno = dataset.coco.loadAnns(ann_ids)
gt_boxes = [obj["bbox"] for obj in anno if obj["iscrowd"] == 0]
gt_boxes = torch.as_tensor(gt_boxes).reshape(-1, 4) # guard against no boxes
gt_boxes = BoxList(gt_boxes, (image_width, image_height), mode="xywh").convert(
"xyxy"
)
gt_areas = torch.as_tensor([obj["area"] for obj in anno if obj["iscrowd"] == 0])
if len(gt_boxes) == 0:
continue
valid_gt_inds = (gt_areas >= area_range[0]) & (gt_areas <= area_range[1])
gt_boxes = gt_boxes[valid_gt_inds]
num_pos += len(gt_boxes)
if len(gt_boxes) == 0:
continue
if len(prediction) == 0:
continue
if limit is not None and len(prediction) > limit:
prediction = prediction[:limit]
overlaps = boxlist_iou(prediction, gt_boxes)
_gt_overlaps = torch.zeros(len(gt_boxes))
for j in range(min(len(prediction), len(gt_boxes))):
# find which proposal box maximally covers each gt box
# and get the iou amount of coverage for each gt box
max_overlaps, argmax_overlaps = overlaps.max(dim=0)
# find which gt box is 'best' covered (i.e. 'best' = most iou)
gt_ovr, gt_ind = max_overlaps.max(dim=0)
assert gt_ovr >= 0
# find the proposal box that covers the best covered gt box
box_ind = argmax_overlaps[gt_ind]
# record the iou coverage of this gt box
_gt_overlaps[j] = overlaps[box_ind, gt_ind]
assert _gt_overlaps[j] == gt_ovr
# mark the proposal box and the gt box as used
overlaps[box_ind, :] = -1
overlaps[:, gt_ind] = -1
# append recorded iou coverage level
gt_overlaps.append(_gt_overlaps)
if len(gt_overlaps) == 0:
return {
"ar": torch.zeros(1),
"recalls": torch.zeros(1),
"thresholds": thresholds,
"gt_overlaps": gt_overlaps,
"num_pos": num_pos,
}
gt_overlaps = torch.cat(gt_overlaps, dim=0)
gt_overlaps, _ = torch.sort(gt_overlaps)
if thresholds is None:
step = 0.05
thresholds = torch.arange(0.5, 0.95 + 1e-5, step, dtype=torch.float32)
recalls = torch.zeros_like(thresholds)
# compute recall for each iou threshold
for i, t in enumerate(thresholds):
recalls[i] = (gt_overlaps >= t).float().sum() / float(num_pos)
# ar = 2 * np.trapz(recalls, thresholds)
ar = recalls.mean()
return {
"ar": ar,
"recalls": recalls,
"thresholds": thresholds,
"gt_overlaps": gt_overlaps,
"num_pos": num_pos,
}
def evaluate_predictions_on_coco(
coco_gt, coco_results, json_result_file, iou_type="bbox"
):
import json
with open(json_result_file, "w") as f:
json.dump(coco_results, f)
from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval
coco_dt = coco_gt.loadRes(str(json_result_file)) if coco_results else COCO()
# coco_dt = coco_gt.loadRes(coco_results)
if iou_type == 'keypoints':
coco_gt = filter_valid_keypoints(coco_gt, coco_dt)
coco_eval = COCOeval(coco_gt, coco_dt, iou_type)
coco_eval.evaluate()
coco_eval.accumulate()
coco_eval.summarize()
if iou_type == 'bbox':
summarize_per_category(coco_eval, json_result_file.replace('.json', '.csv'))
return coco_eval
def summarize_per_category(coco_eval, csv_output=None):
'''
Compute and display summary metrics for evaluation results.
Note this functin can *only* be applied on the default parameter setting
'''
def _summarize(iouThr=None, areaRng='all', maxDets=100):
p = coco_eval.params
titleStr = 'Average Precision'
typeStr = '(AP)'
iouStr = '{:0.2f}:{:0.2f}'.format(p.iouThrs[0], p.iouThrs[-1]) \
if iouThr is None else '{:0.2f}'.format(iouThr)
result_str = ' {:<18} {} @[ IoU={:<9} | area={:>6s} | maxDets={:>3d} ], '. \
format(titleStr, typeStr, iouStr, areaRng, maxDets)
aind = [i for i, aRng in enumerate(p.areaRngLbl) if aRng == areaRng]
mind = [i for i, mDet in enumerate(p.maxDets) if mDet == maxDets]
# dimension of precision: [TxRxKxAxM]
s = coco_eval.eval['precision']
# IoU
if iouThr is not None:
t = np.where(iouThr == p.iouThrs)[0]
s = s[t]
s = s[:, :, :, aind, mind]
if len(s[s > -1]) == 0:
mean_s = -1
else:
mean_s = np.mean(s[s > -1])
# cacluate AP(average precision) for each category
num_classes = len(p.catIds)
avg_ap = 0.0
for i in range(0, num_classes):
result_str += '{}, '.format(np.mean(s[:, :, i, :]))
avg_ap += np.mean(s[:, :, i, :])
result_str += ('{} \n'.format(avg_ap / num_classes))
return result_str
id2name = {}
for _, cat in coco_eval.cocoGt.cats.items():
id2name[cat['id']] = cat['name']
title_str = 'metric, '
for cid in coco_eval.params.catIds:
title_str += '{}, '.format(id2name[cid])
title_str += 'avg \n'
results = [title_str]
results.append(_summarize())
results.append(_summarize(iouThr=.5, maxDets=coco_eval.params.maxDets[2]))
results.append(_summarize(areaRng='small', maxDets=coco_eval.params.maxDets[2]))
results.append(_summarize(areaRng='medium', maxDets=coco_eval.params.maxDets[2]))
results.append(_summarize(areaRng='large', maxDets=coco_eval.params.maxDets[2]))
with open(csv_output, 'w') as f:
for result in results:
f.writelines(result)
def filter_valid_keypoints(coco_gt, coco_dt):
kps = coco_dt.anns[1]['keypoints']
for id, ann in coco_gt.anns.items():
ann['keypoints'][2::3] = [a * b for a, b in zip(ann['keypoints'][2::3], kps[2::3])]
ann['num_keypoints'] = sum(ann['keypoints'][2::3])
return coco_gt
class COCOResults(object):
METRICS = {
"bbox": ["AP", "AP50", "AP75", "APs", "APm", "APl"],
"segm": ["AP", "AP50", "AP75", "APs", "APm", "APl"],
"box_proposal": [
"AR@100",
"ARs@100",
"ARm@100",
"ARl@100",
"AR@1000",
"ARs@1000",
"ARm@1000",
"ARl@1000",
],
"keypoints": ["AP", "AP50", "AP75", "APm", "APl"],
}
def __init__(self, *iou_types):
allowed_types = ("box_proposal", "bbox", "segm", "keypoints")
assert all(iou_type in allowed_types for iou_type in iou_types)
results = OrderedDict()
for iou_type in iou_types:
results[iou_type] = OrderedDict(
[(metric, -1) for metric in COCOResults.METRICS[iou_type]]
)
self.results = results
def update(self, coco_eval):
if coco_eval is None:
return
from pycocotools.cocoeval import COCOeval
assert isinstance(coco_eval, COCOeval)
s = coco_eval.stats
iou_type = coco_eval.params.iouType
res = self.results[iou_type]
metrics = COCOResults.METRICS[iou_type]
for idx, metric in enumerate(metrics):
res[metric] = s[idx]
def __repr__(self):
# TODO make it pretty
return repr(self.results)
def check_expected_results(results, expected_results, sigma_tol):
if not expected_results:
return
logger = logging.getLogger("maskrcnn_benchmark.inference")
for task, metric, (mean, std) in expected_results:
actual_val = results.results[task][metric]
lo = mean - sigma_tol * std
hi = mean + sigma_tol * std
ok = (lo < actual_val) and (actual_val < hi)
msg = (
"{} > {} sanity check (actual vs. expected): "
"{:.3f} vs. mean={:.4f}, std={:.4}, range=({:.4f}, {:.4f})"
).format(task, metric, actual_val, mean, std, lo, hi)
if not ok:
msg = "FAIL: " + msg
logger.error(msg)
else:
msg = "PASS: " + msg
logger.info(msg)