rawalkhirodkar's picture
Add initial commit
28c256d
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the license found in the
# LICENSE file in the root directory of this source tree.
import copy
from abc import ABCMeta, abstractmethod
from inspect import signature
from typing import List, Optional, Tuple
import torch
from mmcv.ops import batched_nms
from mmengine.config import ConfigDict
from mmengine.model import BaseModule, constant_init
from mmengine.structures import InstanceData
from torch import Tensor
from mmdet.structures import SampleList
from mmdet.structures.bbox import (cat_boxes, get_box_tensor, get_box_wh,
scale_boxes)
from mmdet.utils import InstanceList, OptMultiConfig
from ..test_time_augs import merge_aug_results
from ..utils import (filter_scores_and_topk, select_single_mlvl,
unpack_gt_instances)
class BaseDenseHead(BaseModule, metaclass=ABCMeta):
"""Base class for DenseHeads.
1. The ``init_weights`` method is used to initialize densehead's
model parameters. After detector initialization, ``init_weights``
is triggered when ``detector.init_weights()`` is called externally.
2. The ``loss`` method is used to calculate the loss of densehead,
which includes two steps: (1) the densehead model performs forward
propagation to obtain the feature maps (2) The ``loss_by_feat`` method
is called based on the feature maps to calculate the loss.
.. code:: text
loss(): forward() -> loss_by_feat()
3. The ``predict`` method is used to predict detection results,
which includes two steps: (1) the densehead model performs forward
propagation to obtain the feature maps (2) The ``predict_by_feat`` method
is called based on the feature maps to predict detection results including
post-processing.
.. code:: text
predict(): forward() -> predict_by_feat()
4. The ``loss_and_predict`` method is used to return loss and detection
results at the same time. It will call densehead's ``forward``,
``loss_by_feat`` and ``predict_by_feat`` methods in order. If one-stage is
used as RPN, the densehead needs to return both losses and predictions.
This predictions is used as the proposal of roihead.
.. code:: text
loss_and_predict(): forward() -> loss_by_feat() -> predict_by_feat()
"""
def __init__(self, init_cfg: OptMultiConfig = None) -> None:
super().__init__(init_cfg=init_cfg)
# `_raw_positive_infos` will be used in `get_positive_infos`, which
# can get positive information.
self._raw_positive_infos = dict()
def init_weights(self) -> None:
"""Initialize the weights."""
super().init_weights()
# avoid init_cfg overwrite the initialization of `conv_offset`
for m in self.modules():
# DeformConv2dPack, ModulatedDeformConv2dPack
if hasattr(m, 'conv_offset'):
constant_init(m.conv_offset, 0)
def get_positive_infos(self) -> InstanceList:
"""Get positive information from sampling results.
Returns:
list[:obj:`InstanceData`]: Positive information of each image,
usually including positive bboxes, positive labels, positive
priors, etc.
"""
if len(self._raw_positive_infos) == 0:
return None
sampling_results = self._raw_positive_infos.get(
'sampling_results', None)
assert sampling_results is not None
positive_infos = []
for sampling_result in enumerate(sampling_results):
pos_info = InstanceData()
pos_info.bboxes = sampling_result.pos_gt_bboxes
pos_info.labels = sampling_result.pos_gt_labels
pos_info.priors = sampling_result.pos_priors
pos_info.pos_assigned_gt_inds = \
sampling_result.pos_assigned_gt_inds
pos_info.pos_inds = sampling_result.pos_inds
positive_infos.append(pos_info)
return positive_infos
def loss(self, x: Tuple[Tensor], batch_data_samples: SampleList) -> dict:
"""Perform forward propagation and loss calculation of the detection
head on the features of the upstream network.
Args:
x (tuple[Tensor]): Features from the upstream network, each is
a 4D-tensor.
batch_data_samples (List[:obj:`DetDataSample`]): The Data
Samples. It usually includes information such as
`gt_instance`, `gt_panoptic_seg` and `gt_sem_seg`.
Returns:
dict: A dictionary of loss components.
"""
outs = self(x)
outputs = unpack_gt_instances(batch_data_samples)
(batch_gt_instances, batch_gt_instances_ignore,
batch_img_metas) = outputs
loss_inputs = outs + (batch_gt_instances, batch_img_metas,
batch_gt_instances_ignore)
losses = self.loss_by_feat(*loss_inputs)
return losses
@abstractmethod
def loss_by_feat(self, **kwargs) -> dict:
"""Calculate the loss based on the features extracted by the detection
head."""
pass
def loss_and_predict(
self,
x: Tuple[Tensor],
batch_data_samples: SampleList,
proposal_cfg: Optional[ConfigDict] = None
) -> Tuple[dict, InstanceList]:
"""Perform forward propagation of the head, then calculate loss and
predictions from the features and data samples.
Args:
x (tuple[Tensor]): Features from FPN.
batch_data_samples (list[:obj:`DetDataSample`]): Each item contains
the meta information of each image and corresponding
annotations.
proposal_cfg (ConfigDict, optional): Test / postprocessing
configuration, if None, test_cfg would be used.
Defaults to None.
Returns:
tuple: the return value is a tuple contains:
- losses: (dict[str, Tensor]): A dictionary of loss components.
- predictions (list[:obj:`InstanceData`]): Detection
results of each image after the post process.
"""
outputs = unpack_gt_instances(batch_data_samples)
(batch_gt_instances, batch_gt_instances_ignore,
batch_img_metas) = outputs
outs = self(x)
loss_inputs = outs + (batch_gt_instances, batch_img_metas,
batch_gt_instances_ignore)
losses = self.loss_by_feat(*loss_inputs)
predictions = self.predict_by_feat(
*outs, batch_img_metas=batch_img_metas, cfg=proposal_cfg)
return losses, predictions
def predict(self,
x: Tuple[Tensor],
batch_data_samples: SampleList,
rescale: bool = False) -> InstanceList:
"""Perform forward propagation of the detection head and predict
detection results on the features of the upstream network.
Args:
x (tuple[Tensor]): Multi-level features from the
upstream network, each is a 4D-tensor.
batch_data_samples (List[:obj:`DetDataSample`]): The Data
Samples. It usually includes information such as
`gt_instance`, `gt_panoptic_seg` and `gt_sem_seg`.
rescale (bool, optional): Whether to rescale the results.
Defaults to False.
Returns:
list[obj:`InstanceData`]: Detection results of each image
after the post process.
"""
batch_img_metas = [
data_samples.metainfo for data_samples in batch_data_samples
]
outs = self(x)
predictions = self.predict_by_feat(
*outs, batch_img_metas=batch_img_metas, rescale=rescale)
return predictions
def predict_by_feat(self,
cls_scores: List[Tensor],
bbox_preds: List[Tensor],
score_factors: Optional[List[Tensor]] = None,
batch_img_metas: Optional[List[dict]] = None,
cfg: Optional[ConfigDict] = None,
rescale: bool = False,
with_nms: bool = True) -> InstanceList:
"""Transform a batch of output features extracted from the head into
bbox results.
Note: When score_factors is not None, the cls_scores are
usually multiplied by it then obtain the real score used in NMS,
such as CenterNess in FCOS, IoU branch in ATSS.
Args:
cls_scores (list[Tensor]): Classification scores for all
scale levels, each is a 4D-tensor, has shape
(batch_size, num_priors * num_classes, H, W).
bbox_preds (list[Tensor]): Box energies / deltas for all
scale levels, each is a 4D-tensor, has shape
(batch_size, num_priors * 4, H, W).
score_factors (list[Tensor], optional): Score factor for
all scale level, each is a 4D-tensor, has shape
(batch_size, num_priors * 1, H, W). Defaults to None.
batch_img_metas (list[dict], Optional): Batch image meta info.
Defaults to None.
cfg (ConfigDict, optional): Test / postprocessing
configuration, if None, test_cfg would be used.
Defaults to None.
rescale (bool): If True, return boxes in original image space.
Defaults to False.
with_nms (bool): If True, do nms before return boxes.
Defaults to True.
Returns:
list[:obj:`InstanceData`]: Object detection results of each image
after the post process. Each item usually contains following keys.
- scores (Tensor): Classification scores, has a shape
(num_instance, )
- labels (Tensor): Labels of bboxes, has a shape
(num_instances, ).
- bboxes (Tensor): Has a shape (num_instances, 4),
the last dimension 4 arrange as (x1, y1, x2, y2).
"""
assert len(cls_scores) == len(bbox_preds)
if score_factors is None:
# e.g. Retina, FreeAnchor, Foveabox, etc.
with_score_factors = False
else:
# e.g. FCOS, PAA, ATSS, AutoAssign, etc.
with_score_factors = True
assert len(cls_scores) == len(score_factors)
num_levels = len(cls_scores)
featmap_sizes = [cls_scores[i].shape[-2:] for i in range(num_levels)]
mlvl_priors = self.prior_generator.grid_priors(
featmap_sizes,
dtype=cls_scores[0].dtype,
device=cls_scores[0].device)
result_list = []
for img_id in range(len(batch_img_metas)):
img_meta = batch_img_metas[img_id]
cls_score_list = select_single_mlvl(
cls_scores, img_id, detach=True)
bbox_pred_list = select_single_mlvl(
bbox_preds, img_id, detach=True)
if with_score_factors:
score_factor_list = select_single_mlvl(
score_factors, img_id, detach=True)
else:
score_factor_list = [None for _ in range(num_levels)]
results = self._predict_by_feat_single(
cls_score_list=cls_score_list,
bbox_pred_list=bbox_pred_list,
score_factor_list=score_factor_list,
mlvl_priors=mlvl_priors,
img_meta=img_meta,
cfg=cfg,
rescale=rescale,
with_nms=with_nms)
result_list.append(results)
return result_list
def _predict_by_feat_single(self,
cls_score_list: List[Tensor],
bbox_pred_list: List[Tensor],
score_factor_list: List[Tensor],
mlvl_priors: List[Tensor],
img_meta: dict,
cfg: ConfigDict,
rescale: bool = False,
with_nms: bool = True) -> InstanceData:
"""Transform a single image's features extracted from the head into
bbox results.
Args:
cls_score_list (list[Tensor]): Box scores from all scale
levels of a single image, each item has shape
(num_priors * num_classes, H, W).
bbox_pred_list (list[Tensor]): Box energies / deltas from
all scale levels of a single image, each item has shape
(num_priors * 4, H, W).
score_factor_list (list[Tensor]): Score factor from all scale
levels of a single image, each item has shape
(num_priors * 1, H, W).
mlvl_priors (list[Tensor]): Each element in the list is
the priors of a single level in feature pyramid. In all
anchor-based methods, it has shape (num_priors, 4). In
all anchor-free methods, it has shape (num_priors, 2)
when `with_stride=True`, otherwise it still has shape
(num_priors, 4).
img_meta (dict): Image meta info.
cfg (mmengine.Config): Test / postprocessing configuration,
if None, test_cfg would be used.
rescale (bool): If True, return boxes in original image space.
Defaults to False.
with_nms (bool): If True, do nms before return boxes.
Defaults to True.
Returns:
:obj:`InstanceData`: Detection results of each image
after the post process.
Each item usually contains following keys.
- scores (Tensor): Classification scores, has a shape
(num_instance, )
- labels (Tensor): Labels of bboxes, has a shape
(num_instances, ).
- bboxes (Tensor): Has a shape (num_instances, 4),
the last dimension 4 arrange as (x1, y1, x2, y2).
"""
if score_factor_list[0] is None:
# e.g. Retina, FreeAnchor, etc.
with_score_factors = False
else:
# e.g. FCOS, PAA, ATSS, etc.
with_score_factors = True
cfg = self.test_cfg if cfg is None else cfg
cfg = copy.deepcopy(cfg)
img_shape = img_meta['img_shape']
nms_pre = cfg.get('nms_pre', -1)
mlvl_bbox_preds = []
mlvl_valid_priors = []
mlvl_scores = []
mlvl_labels = []
if with_score_factors:
mlvl_score_factors = []
else:
mlvl_score_factors = None
for level_idx, (cls_score, bbox_pred, score_factor, priors) in \
enumerate(zip(cls_score_list, bbox_pred_list,
score_factor_list, mlvl_priors)):
assert cls_score.size()[-2:] == bbox_pred.size()[-2:]
dim = self.bbox_coder.encode_size
bbox_pred = bbox_pred.permute(1, 2, 0).reshape(-1, dim)
if with_score_factors:
score_factor = score_factor.permute(1, 2,
0).reshape(-1).sigmoid()
cls_score = cls_score.permute(1, 2,
0).reshape(-1, self.cls_out_channels)
# the `custom_cls_channels` parameter is derived from
# CrossEntropyCustomLoss and FocalCustomLoss, and is currently used
# in v3det.
if getattr(self.loss_cls, 'custom_cls_channels', False):
scores = self.loss_cls.get_activation(cls_score)
elif self.use_sigmoid_cls:
scores = cls_score.sigmoid()
else:
# remind that we set FG labels to [0, num_class-1]
# since mmdet v2.0
# BG cat_id: num_class
scores = cls_score.softmax(-1)[:, :-1]
# After https://github.com/open-mmlab/mmdetection/pull/6268/,
# this operation keeps fewer bboxes under the same `nms_pre`.
# There is no difference in performance for most models. If you
# find a slight drop in performance, you can set a larger
# `nms_pre` than before.
score_thr = cfg.get('score_thr', 0)
results = filter_scores_and_topk(
scores, score_thr, nms_pre,
dict(bbox_pred=bbox_pred, priors=priors))
scores, labels, keep_idxs, filtered_results = results
bbox_pred = filtered_results['bbox_pred']
priors = filtered_results['priors']
if with_score_factors:
score_factor = score_factor[keep_idxs]
mlvl_bbox_preds.append(bbox_pred)
mlvl_valid_priors.append(priors)
mlvl_scores.append(scores)
mlvl_labels.append(labels)
if with_score_factors:
mlvl_score_factors.append(score_factor)
bbox_pred = torch.cat(mlvl_bbox_preds)
priors = cat_boxes(mlvl_valid_priors)
bboxes = self.bbox_coder.decode(priors, bbox_pred, max_shape=img_shape)
results = InstanceData()
results.bboxes = bboxes
results.scores = torch.cat(mlvl_scores)
results.labels = torch.cat(mlvl_labels)
if with_score_factors:
results.score_factors = torch.cat(mlvl_score_factors)
return self._bbox_post_process(
results=results,
cfg=cfg,
rescale=rescale,
with_nms=with_nms,
img_meta=img_meta)
def _bbox_post_process(self,
results: InstanceData,
cfg: ConfigDict,
rescale: bool = False,
with_nms: bool = True,
img_meta: Optional[dict] = None) -> InstanceData:
"""bbox post-processing method.
The boxes would be rescaled to the original image scale and do
the nms operation. Usually `with_nms` is False is used for aug test.
Args:
results (:obj:`InstaceData`): Detection instance results,
each item has shape (num_bboxes, ).
cfg (ConfigDict): Test / postprocessing configuration,
if None, test_cfg would be used.
rescale (bool): If True, return boxes in original image space.
Default to False.
with_nms (bool): If True, do nms before return boxes.
Default to True.
img_meta (dict, optional): Image meta info. Defaults to None.
Returns:
:obj:`InstanceData`: Detection results of each image
after the post process.
Each item usually contains following keys.
- scores (Tensor): Classification scores, has a shape
(num_instance, )
- labels (Tensor): Labels of bboxes, has a shape
(num_instances, ).
- bboxes (Tensor): Has a shape (num_instances, 4),
the last dimension 4 arrange as (x1, y1, x2, y2).
"""
if rescale:
assert img_meta.get('scale_factor') is not None
scale_factor = [1 / s for s in img_meta['scale_factor']]
results.bboxes = scale_boxes(results.bboxes, scale_factor)
if hasattr(results, 'score_factors'):
# TODO: Add sqrt operation in order to be consistent with
# the paper.
score_factors = results.pop('score_factors')
results.scores = results.scores * score_factors
# filter small size bboxes
if cfg.get('min_bbox_size', -1) >= 0:
w, h = get_box_wh(results.bboxes)
valid_mask = (w > cfg.min_bbox_size) & (h > cfg.min_bbox_size)
if not valid_mask.all():
results = results[valid_mask]
# TODO: deal with `with_nms` and `nms_cfg=None` in test_cfg
if with_nms and results.bboxes.numel() > 0:
bboxes = get_box_tensor(results.bboxes)
det_bboxes, keep_idxs = batched_nms(bboxes, results.scores,
results.labels, cfg.nms)
results = results[keep_idxs]
# some nms would reweight the score, such as softnms
results.scores = det_bboxes[:, -1]
results = results[:cfg.max_per_img]
return results
def aug_test(self,
aug_batch_feats,
aug_batch_img_metas,
rescale=False,
with_ori_nms=False,
**kwargs):
"""Test function with test time augmentation.
Args:
aug_batch_feats (list[tuple[Tensor]]): The outer list
indicates test-time augmentations and inner tuple
indicate the multi-level feats from
FPN, each Tensor should have a shape (B, C, H, W),
aug_batch_img_metas (list[list[dict]]): Meta information
of images under the different test-time augs
(multiscale, flip, etc.). The outer list indicate
the
rescale (bool, optional): Whether to rescale the results.
Defaults to False.
with_ori_nms (bool): Whether execute the nms in original head.
Defaults to False. It will be `True` when the head is
adopted as `rpn_head`.
Returns:
list(obj:`InstanceData`): Detection results of the
input images. Each item usually contains\
following keys.
- scores (Tensor): Classification scores, has a shape
(num_instance,)
- labels (Tensor): Labels of bboxes, has a shape
(num_instances,).
- bboxes (Tensor): Has a shape (num_instances, 4),
the last dimension 4 arrange as (x1, y1, x2, y2).
"""
# TODO: remove this for detr and deformdetr
sig_of_get_results = signature(self.get_results)
get_results_args = [
p.name for p in sig_of_get_results.parameters.values()
]
get_results_single_sig = signature(self._get_results_single)
get_results_single_sig_args = [
p.name for p in get_results_single_sig.parameters.values()
]
assert ('with_nms' in get_results_args) and \
('with_nms' in get_results_single_sig_args), \
f'{self.__class__.__name__}' \
'does not support test-time augmentation '
num_imgs = len(aug_batch_img_metas[0])
aug_batch_results = []
for x, img_metas in zip(aug_batch_feats, aug_batch_img_metas):
outs = self.forward(x)
batch_instance_results = self.get_results(
*outs,
img_metas=img_metas,
cfg=self.test_cfg,
rescale=False,
with_nms=with_ori_nms,
**kwargs)
aug_batch_results.append(batch_instance_results)
# after merging, bboxes will be rescaled to the original image
batch_results = merge_aug_results(aug_batch_results,
aug_batch_img_metas)
final_results = []
for img_id in range(num_imgs):
results = batch_results[img_id]
det_bboxes, keep_idxs = batched_nms(results.bboxes, results.scores,
results.labels,
self.test_cfg.nms)
results = results[keep_idxs]
# some nms operation may reweight the score such as softnms
results.scores = det_bboxes[:, -1]
results = results[:self.test_cfg.max_per_img]
if rescale:
# all results have been mapped to the original scale
# in `merge_aug_results`, so just pass
pass
else:
# map to the first aug image scale
scale_factor = results.bboxes.new_tensor(
aug_batch_img_metas[0][img_id]['scale_factor'])
results.bboxes = \
results.bboxes * scale_factor
final_results.append(results)
return final_results