# coding: utf-8 __author__ = 'ZFTurbo: https://kaggle.com/zfturbo' """ Method described in: CAD: Scale Invariant Framework for Real-Time Object Detection http://openaccess.thecvf.com/content_ICCV_2017_workshops/papers/w14/Zhou_CAD_Scale_Invariant_ICCV_2017_paper.pdf """ import warnings import numpy as np from numba import jit @jit(nopython=True) def bb_intersection_over_union(A, B): xA = max(A[0], B[0]) yA = max(A[1], B[1]) xB = min(A[2], B[2]) yB = min(A[3], B[3]) # compute the area of intersection rectangle interArea = max(0, xB - xA) * max(0, yB - yA) if interArea == 0: return 0.0 # compute the area of both the prediction and ground-truth rectangles boxAArea = (A[2] - A[0]) * (A[3] - A[1]) boxBArea = (B[2] - B[0]) * (B[3] - B[1]) iou = interArea / float(boxAArea + boxBArea - interArea) return iou def prefilter_boxes(boxes, scores, labels, weights, thr): # Create dict with boxes stored by its label new_boxes = dict() for t in range(len(boxes)): if len(boxes[t]) != len(scores[t]): print('Error. Length of boxes arrays not equal to length of scores array: {} != {}'.format(len(boxes[t]), len(scores[t]))) exit() if len(boxes[t]) != len(labels[t]): print('Error. Length of boxes arrays not equal to length of labels array: {} != {}'.format(len(boxes[t]), len(labels[t]))) exit() for j in range(len(boxes[t])): score = scores[t][j] if score < thr: continue label = int(labels[t][j]) box_part = boxes[t][j] x1 = float(box_part[0]) y1 = float(box_part[1]) x2 = float(box_part[2]) y2 = float(box_part[3]) # Box data checks if x2 < x1: warnings.warn('X2 < X1 value in box. Swap them.') x1, x2 = x2, x1 if y2 < y1: warnings.warn('Y2 < Y1 value in box. Swap them.') y1, y2 = y2, y1 if x1 < 0: warnings.warn('X1 < 0 in box. Set it to 0.') x1 = 0 if x1 > 1: warnings.warn('X1 > 1 in box. Set it to 1. Check that you normalize boxes in [0, 1] range.') x1 = 1 if x2 < 0: warnings.warn('X2 < 0 in box. Set it to 0.') x2 = 0 if x2 > 1: warnings.warn('X2 > 1 in box. Set it to 1. Check that you normalize boxes in [0, 1] range.') x2 = 1 if y1 < 0: warnings.warn('Y1 < 0 in box. Set it to 0.') y1 = 0 if y1 > 1: warnings.warn('Y1 > 1 in box. Set it to 1. Check that you normalize boxes in [0, 1] range.') y1 = 1 if y2 < 0: warnings.warn('Y2 < 0 in box. Set it to 0.') y2 = 0 if y2 > 1: warnings.warn('Y2 > 1 in box. Set it to 1. Check that you normalize boxes in [0, 1] range.') y2 = 1 if (x2 - x1) * (y2 - y1) == 0.0: warnings.warn("Zero area box skipped: {}.".format(box_part)) continue b = [int(label), float(score) * weights[t], x1, y1, x2, y2] if label not in new_boxes: new_boxes[label] = [] new_boxes[label].append(b) # Sort each list in dict by score and transform it to numpy array for k in new_boxes: current_boxes = np.array(new_boxes[k]) new_boxes[k] = current_boxes[current_boxes[:, 1].argsort()[::-1]] return new_boxes def get_weighted_box(boxes): """ Create weighted box for set of boxes :param boxes: set of boxes to fuse :return: weighted box """ box = np.zeros(6, dtype=np.float32) best_box = boxes[0] conf = 0 for b in boxes: iou = bb_intersection_over_union(b[2:], best_box[2:]) weight = b[1] * iou box[2:] += (weight * b[2:]) conf += weight box[0] = best_box[0] box[1] = best_box[1] box[2:] /= conf return box def find_matching_box(boxes_list, new_box, match_iou): best_iou = match_iou best_index = -1 for i in range(len(boxes_list)): box = boxes_list[i] if box[0] != new_box[0]: continue iou = bb_intersection_over_union(box[2:], new_box[2:]) if iou > best_iou: best_index = i best_iou = iou return best_index, best_iou def non_maximum_weighted(boxes_list, scores_list, labels_list, weights=None, iou_thr=0.55, skip_box_thr=0.0): ''' :param boxes_list: list of boxes predictions from each model, each box is 4 numbers. It has 3 dimensions (models_number, model_preds, 4) Order of boxes: x1, y1, x2, y2. We expect float normalized coordinates [0; 1] :param scores_list: list of scores for each model :param labels_list: list of labels for each model :param weights: list of weights for each model. Default: None, which means weight == 1 for each model :param iou_thr: IoU value for boxes to be a match :param skip_box_thr: exclude boxes with score lower than this variable :return: boxes: boxes coordinates (Order of boxes: x1, y1, x2, y2). :return: scores: confidence scores :return: labels: boxes labels ''' if weights is None: weights = np.ones(len(boxes_list)) if len(weights) != len(boxes_list): print('Warning: incorrect number of weights {}. Must be: {}. Set weights equal to 1.'.format(len(weights), len(boxes_list))) weights = np.ones(len(boxes_list)) weights = np.array(weights) / max(weights) # for i in range(len(weights)): # scores_list[i] = (np.array(scores_list[i]) * weights[i]) filtered_boxes = prefilter_boxes(boxes_list, scores_list, labels_list, weights, skip_box_thr) if len(filtered_boxes) == 0: return np.zeros((0, 4)), np.zeros((0,)), np.zeros((0,)) overall_boxes = [] for label in filtered_boxes: boxes = filtered_boxes[label] new_boxes = [] main_boxes = [] # Clusterize boxes for j in range(0, len(boxes)): index, best_iou = find_matching_box(main_boxes, boxes[j], iou_thr) if index != -1: new_boxes[index].append(boxes[j].copy()) else: new_boxes.append([boxes[j].copy()]) main_boxes.append(boxes[j].copy()) weighted_boxes = [] for j in range(0, len(new_boxes)): box = get_weighted_box(new_boxes[j]) weighted_boxes.append(box.copy()) overall_boxes.append(np.array(weighted_boxes)) overall_boxes = np.concatenate(overall_boxes, axis=0) overall_boxes = overall_boxes[overall_boxes[:, 1].argsort()[::-1]] boxes = overall_boxes[:, 2:] scores = overall_boxes[:, 1] labels = overall_boxes[:, 0] return boxes, scores, labels