rawalkhirodkar's picture
Add initial commit
28c256d
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the license found in the
# LICENSE file in the root directory of this source tree.
from collections import defaultdict
from typing import Tuple
import numpy as np
import torch
from mmengine.model import BaseModule
from mmengine.runner.checkpoint import load_checkpoint
from scipy.optimize import linear_sum_assignment
from torch import Tensor, nn
from mmdet.registry import TASK_UTILS
INFINITY = 1e5
class TemporalBlock(BaseModule):
"""The temporal block of AFLink model.
Args:
in_channel (int): the dimension of the input channels.
out_channel (int): the dimension of the output channels.
"""
def __init__(self,
in_channel: int,
out_channel: int,
kernel_size: tuple = (7, 1)):
super(TemporalBlock, self).__init__()
self.conv = nn.Conv2d(in_channel, out_channel, kernel_size, bias=False)
self.relu = nn.ReLU(inplace=True)
self.bnf = nn.BatchNorm1d(out_channel)
self.bnx = nn.BatchNorm1d(out_channel)
self.bny = nn.BatchNorm1d(out_channel)
def bn(self, x: Tensor) -> Tensor:
x[:, :, :, 0] = self.bnf(x[:, :, :, 0])
x[:, :, :, 1] = self.bnx(x[:, :, :, 1])
x[:, :, :, 2] = self.bny(x[:, :, :, 2])
return x
def forward(self, x: Tensor) -> Tensor:
x = self.conv(x)
x = self.bn(x)
x = self.relu(x)
return x
class FusionBlock(BaseModule):
"""The fusion block of AFLink model.
Args:
in_channel (int): the dimension of the input channels.
out_channel (int): the dimension of the output channels.
"""
def __init__(self, in_channel: int, out_channel: int):
super(FusionBlock, self).__init__()
self.conv = nn.Conv2d(in_channel, out_channel, (1, 3), bias=False)
self.bn = nn.BatchNorm2d(out_channel)
self.relu = nn.ReLU(inplace=True)
def forward(self, x: Tensor) -> Tensor:
x = self.conv(x)
x = self.bn(x)
x = self.relu(x)
return x
class Classifier(BaseModule):
"""The classifier of AFLink model.
Args:
in_channel (int): the dimension of the input channels.
"""
def __init__(self, in_channel: int, out_channel: int):
super(Classifier, self).__init__()
self.fc1 = nn.Linear(in_channel * 2, in_channel // 2)
self.relu = nn.ReLU(inplace=True)
self.fc2 = nn.Linear(in_channel // 2, out_channel)
def forward(self, x1: Tensor, x2: Tensor) -> Tensor:
x = torch.cat((x1, x2), dim=1)
x = self.fc1(x)
x = self.relu(x)
x = self.fc2(x)
return x
class AFLinkModel(BaseModule):
"""Appearance-Free Link Model."""
def __init__(self,
temporal_module_channels: list = [1, 32, 64, 128, 256],
fusion_module_channels: list = [256, 256],
classifier_channels: list = [256, 2]):
super(AFLinkModel, self).__init__()
self.TemporalModule_1 = nn.Sequential(*[
TemporalBlock(temporal_module_channels[i],
temporal_module_channels[i + 1])
for i in range(len(temporal_module_channels) - 1)
])
self.TemporalModule_2 = nn.Sequential(*[
TemporalBlock(temporal_module_channels[i],
temporal_module_channels[i + 1])
for i in range(len(temporal_module_channels) - 1)
])
self.FusionBlock_1 = FusionBlock(*fusion_module_channels)
self.FusionBlock_2 = FusionBlock(*fusion_module_channels)
self.pooling = nn.AdaptiveAvgPool2d((1, 1))
self.classifier = Classifier(*classifier_channels)
def forward(self, x1: Tensor, x2: Tensor) -> Tensor:
assert not self.training, 'Only testing is supported for AFLink.'
x1 = x1[:, :, :, :3]
x2 = x2[:, :, :, :3]
x1 = self.TemporalModule_1(x1) # [B,1,30,3] -> [B,256,6,3]
x2 = self.TemporalModule_2(x2)
x1 = self.FusionBlock_1(x1)
x2 = self.FusionBlock_2(x2)
x1 = self.pooling(x1).squeeze(-1).squeeze(-1)
x2 = self.pooling(x2).squeeze(-1).squeeze(-1)
y = self.classifier(x1, x2)
y = torch.softmax(y, dim=1)[0, 1]
return y
@TASK_UTILS.register_module()
class AppearanceFreeLink(BaseModule):
"""Appearance-Free Link method.
This method is proposed in
"StrongSORT: Make DeepSORT Great Again"
`StrongSORT<https://arxiv.org/abs/2202.13514>`_.
Args:
checkpoint (str): Checkpoint path.
temporal_threshold (tuple, optional): The temporal constraint
for tracklets association. Defaults to (0, 30).
spatial_threshold (int, optional): The spatial constraint for
tracklets association. Defaults to 75.
confidence_threshold (float, optional): The minimum confidence
threshold for tracklets association. Defaults to 0.95.
"""
def __init__(self,
checkpoint: str,
temporal_threshold: tuple = (0, 30),
spatial_threshold: int = 75,
confidence_threshold: float = 0.95):
super(AppearanceFreeLink, self).__init__()
self.temporal_threshold = temporal_threshold
self.spatial_threshold = spatial_threshold
self.confidence_threshold = confidence_threshold
self.model = AFLinkModel()
if checkpoint:
load_checkpoint(self.model, checkpoint)
if torch.cuda.is_available():
self.model.cuda()
self.model.eval()
self.device = next(self.model.parameters()).device
self.fn_l2 = lambda x, y: np.sqrt(x**2 + y**2)
def data_transform(self,
track1: np.ndarray,
track2: np.ndarray,
length: int = 30) -> Tuple[np.ndarray]:
"""Data Transformation. This is used to standardize the length of
tracks to a unified length. Then perform min-max normalization to the
motion embeddings.
Args:
track1 (ndarray): the first track with shape (N,C).
track2 (ndarray): the second track with shape (M,C).
length (int): the unified length of tracks. Defaults to 30.
Returns:
Tuple[ndarray]: the transformed track1 and track2.
"""
# fill or cut track1
length_1 = track1.shape[0]
track1 = track1[-length:] if length_1 >= length else \
np.pad(track1, ((length - length_1, 0), (0, 0)))
# fill or cut track1
length_2 = track2.shape[0]
track2 = track2[:length] if length_2 >= length else \
np.pad(track2, ((0, length - length_2), (0, 0)))
# min-max normalization
min_ = np.concatenate((track1, track2), axis=0).min(axis=0)
max_ = np.concatenate((track1, track2), axis=0).max(axis=0)
subtractor = (max_ + min_) / 2
divisor = (max_ - min_) / 2 + 1e-5
track1 = (track1 - subtractor) / divisor
track2 = (track2 - subtractor) / divisor
return track1, track2
def forward(self, pred_tracks: np.ndarray) -> np.ndarray:
"""Forward function.
pred_tracks (ndarray): With shape (N, 7). Each row denotes
(frame_id, track_id, x1, y1, x2, y2, score).
Returns:
ndarray: The linked tracks with shape (N, 7). Each row denotes
(frame_id, track_id, x1, y1, x2, y2, score)
"""
# sort tracks by the frame id
pred_tracks = pred_tracks[np.argsort(pred_tracks[:, 0])]
# gather tracks information
id2info = defaultdict(list)
for row in pred_tracks:
frame_id, track_id, x1, y1, x2, y2 = row[:6]
id2info[track_id].append([frame_id, x1, y1, x2 - x1, y2 - y1])
id2info = {k: np.array(v) for k, v in id2info.items()}
num_track = len(id2info)
track_ids = np.array(list(id2info))
cost_matrix = np.full((num_track, num_track), INFINITY)
# compute the cost matrix
for i, id_i in enumerate(track_ids):
for j, id_j in enumerate(track_ids):
if id_i == id_j:
continue
info_i, info_j = id2info[id_i], id2info[id_j]
frame_i, box_i = info_i[-1][0], info_i[-1][1:3]
frame_j, box_j = info_j[0][0], info_j[0][1:3]
# temporal constraint
if not self.temporal_threshold[0] <= \
frame_j - frame_i <= self.temporal_threshold[1]:
continue
# spatial constraint
if self.fn_l2(box_i[0] - box_j[0], box_i[1] - box_j[1]) \
> self.spatial_threshold:
continue
# confidence constraint
track_i, track_j = self.data_transform(info_i, info_j)
# numpy to torch
track_i = torch.tensor(
track_i, dtype=torch.float).to(self.device)
track_j = torch.tensor(
track_j, dtype=torch.float).to(self.device)
track_i = track_i.unsqueeze(0).unsqueeze(0)
track_j = track_j.unsqueeze(0).unsqueeze(0)
confidence = self.model(track_i,
track_j).detach().cpu().numpy()
if confidence >= self.confidence_threshold:
cost_matrix[i, j] = 1 - confidence
# linear assignment
indices = linear_sum_assignment(cost_matrix)
_id2id = dict() # the temporary assignment results
id2id = dict() # the final assignment results
for i, j in zip(indices[0], indices[1]):
if cost_matrix[i, j] < INFINITY:
_id2id[i] = j
for k, v in _id2id.items():
if k in id2id:
id2id[v] = id2id[k]
else:
id2id[v] = k
# link
for k, v in id2id.items():
pred_tracks[pred_tracks[:, 1] == k, 1] = v
# deduplicate
_, index = np.unique(pred_tracks[:, :2], return_index=True, axis=0)
return pred_tracks[index]