# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the license found in the # LICENSE file in the root directory of this source tree. from collections import defaultdict from typing import Tuple import numpy as np import torch from mmengine.model import BaseModule from mmengine.runner.checkpoint import load_checkpoint from scipy.optimize import linear_sum_assignment from torch import Tensor, nn from mmdet.registry import TASK_UTILS INFINITY = 1e5 class TemporalBlock(BaseModule): """The temporal block of AFLink model. Args: in_channel (int): the dimension of the input channels. out_channel (int): the dimension of the output channels. """ def __init__(self, in_channel: int, out_channel: int, kernel_size: tuple = (7, 1)): super(TemporalBlock, self).__init__() self.conv = nn.Conv2d(in_channel, out_channel, kernel_size, bias=False) self.relu = nn.ReLU(inplace=True) self.bnf = nn.BatchNorm1d(out_channel) self.bnx = nn.BatchNorm1d(out_channel) self.bny = nn.BatchNorm1d(out_channel) def bn(self, x: Tensor) -> Tensor: x[:, :, :, 0] = self.bnf(x[:, :, :, 0]) x[:, :, :, 1] = self.bnx(x[:, :, :, 1]) x[:, :, :, 2] = self.bny(x[:, :, :, 2]) return x def forward(self, x: Tensor) -> Tensor: x = self.conv(x) x = self.bn(x) x = self.relu(x) return x class FusionBlock(BaseModule): """The fusion block of AFLink model. Args: in_channel (int): the dimension of the input channels. out_channel (int): the dimension of the output channels. """ def __init__(self, in_channel: int, out_channel: int): super(FusionBlock, self).__init__() self.conv = nn.Conv2d(in_channel, out_channel, (1, 3), bias=False) self.bn = nn.BatchNorm2d(out_channel) self.relu = nn.ReLU(inplace=True) def forward(self, x: Tensor) -> Tensor: x = self.conv(x) x = self.bn(x) x = self.relu(x) return x class Classifier(BaseModule): """The classifier of AFLink model. Args: in_channel (int): the dimension of the input channels. """ def __init__(self, in_channel: int, out_channel: int): super(Classifier, self).__init__() self.fc1 = nn.Linear(in_channel * 2, in_channel // 2) self.relu = nn.ReLU(inplace=True) self.fc2 = nn.Linear(in_channel // 2, out_channel) def forward(self, x1: Tensor, x2: Tensor) -> Tensor: x = torch.cat((x1, x2), dim=1) x = self.fc1(x) x = self.relu(x) x = self.fc2(x) return x class AFLinkModel(BaseModule): """Appearance-Free Link Model.""" def __init__(self, temporal_module_channels: list = [1, 32, 64, 128, 256], fusion_module_channels: list = [256, 256], classifier_channels: list = [256, 2]): super(AFLinkModel, self).__init__() self.TemporalModule_1 = nn.Sequential(*[ TemporalBlock(temporal_module_channels[i], temporal_module_channels[i + 1]) for i in range(len(temporal_module_channels) - 1) ]) self.TemporalModule_2 = nn.Sequential(*[ TemporalBlock(temporal_module_channels[i], temporal_module_channels[i + 1]) for i in range(len(temporal_module_channels) - 1) ]) self.FusionBlock_1 = FusionBlock(*fusion_module_channels) self.FusionBlock_2 = FusionBlock(*fusion_module_channels) self.pooling = nn.AdaptiveAvgPool2d((1, 1)) self.classifier = Classifier(*classifier_channels) def forward(self, x1: Tensor, x2: Tensor) -> Tensor: assert not self.training, 'Only testing is supported for AFLink.' x1 = x1[:, :, :, :3] x2 = x2[:, :, :, :3] x1 = self.TemporalModule_1(x1) # [B,1,30,3] -> [B,256,6,3] x2 = self.TemporalModule_2(x2) x1 = self.FusionBlock_1(x1) x2 = self.FusionBlock_2(x2) x1 = self.pooling(x1).squeeze(-1).squeeze(-1) x2 = self.pooling(x2).squeeze(-1).squeeze(-1) y = self.classifier(x1, x2) y = torch.softmax(y, dim=1)[0, 1] return y @TASK_UTILS.register_module() class AppearanceFreeLink(BaseModule): """Appearance-Free Link method. This method is proposed in "StrongSORT: Make DeepSORT Great Again" `StrongSORT`_. Args: checkpoint (str): Checkpoint path. temporal_threshold (tuple, optional): The temporal constraint for tracklets association. Defaults to (0, 30). spatial_threshold (int, optional): The spatial constraint for tracklets association. Defaults to 75. confidence_threshold (float, optional): The minimum confidence threshold for tracklets association. Defaults to 0.95. """ def __init__(self, checkpoint: str, temporal_threshold: tuple = (0, 30), spatial_threshold: int = 75, confidence_threshold: float = 0.95): super(AppearanceFreeLink, self).__init__() self.temporal_threshold = temporal_threshold self.spatial_threshold = spatial_threshold self.confidence_threshold = confidence_threshold self.model = AFLinkModel() if checkpoint: load_checkpoint(self.model, checkpoint) if torch.cuda.is_available(): self.model.cuda() self.model.eval() self.device = next(self.model.parameters()).device self.fn_l2 = lambda x, y: np.sqrt(x**2 + y**2) def data_transform(self, track1: np.ndarray, track2: np.ndarray, length: int = 30) -> Tuple[np.ndarray]: """Data Transformation. This is used to standardize the length of tracks to a unified length. Then perform min-max normalization to the motion embeddings. Args: track1 (ndarray): the first track with shape (N,C). track2 (ndarray): the second track with shape (M,C). length (int): the unified length of tracks. Defaults to 30. Returns: Tuple[ndarray]: the transformed track1 and track2. """ # fill or cut track1 length_1 = track1.shape[0] track1 = track1[-length:] if length_1 >= length else \ np.pad(track1, ((length - length_1, 0), (0, 0))) # fill or cut track1 length_2 = track2.shape[0] track2 = track2[:length] if length_2 >= length else \ np.pad(track2, ((0, length - length_2), (0, 0))) # min-max normalization min_ = np.concatenate((track1, track2), axis=0).min(axis=0) max_ = np.concatenate((track1, track2), axis=0).max(axis=0) subtractor = (max_ + min_) / 2 divisor = (max_ - min_) / 2 + 1e-5 track1 = (track1 - subtractor) / divisor track2 = (track2 - subtractor) / divisor return track1, track2 def forward(self, pred_tracks: np.ndarray) -> np.ndarray: """Forward function. pred_tracks (ndarray): With shape (N, 7). Each row denotes (frame_id, track_id, x1, y1, x2, y2, score). Returns: ndarray: The linked tracks with shape (N, 7). Each row denotes (frame_id, track_id, x1, y1, x2, y2, score) """ # sort tracks by the frame id pred_tracks = pred_tracks[np.argsort(pred_tracks[:, 0])] # gather tracks information id2info = defaultdict(list) for row in pred_tracks: frame_id, track_id, x1, y1, x2, y2 = row[:6] id2info[track_id].append([frame_id, x1, y1, x2 - x1, y2 - y1]) id2info = {k: np.array(v) for k, v in id2info.items()} num_track = len(id2info) track_ids = np.array(list(id2info)) cost_matrix = np.full((num_track, num_track), INFINITY) # compute the cost matrix for i, id_i in enumerate(track_ids): for j, id_j in enumerate(track_ids): if id_i == id_j: continue info_i, info_j = id2info[id_i], id2info[id_j] frame_i, box_i = info_i[-1][0], info_i[-1][1:3] frame_j, box_j = info_j[0][0], info_j[0][1:3] # temporal constraint if not self.temporal_threshold[0] <= \ frame_j - frame_i <= self.temporal_threshold[1]: continue # spatial constraint if self.fn_l2(box_i[0] - box_j[0], box_i[1] - box_j[1]) \ > self.spatial_threshold: continue # confidence constraint track_i, track_j = self.data_transform(info_i, info_j) # numpy to torch track_i = torch.tensor( track_i, dtype=torch.float).to(self.device) track_j = torch.tensor( track_j, dtype=torch.float).to(self.device) track_i = track_i.unsqueeze(0).unsqueeze(0) track_j = track_j.unsqueeze(0).unsqueeze(0) confidence = self.model(track_i, track_j).detach().cpu().numpy() if confidence >= self.confidence_threshold: cost_matrix[i, j] = 1 - confidence # linear assignment indices = linear_sum_assignment(cost_matrix) _id2id = dict() # the temporary assignment results id2id = dict() # the final assignment results for i, j in zip(indices[0], indices[1]): if cost_matrix[i, j] < INFINITY: _id2id[i] = j for k, v in _id2id.items(): if k in id2id: id2id[v] = id2id[k] else: id2id[v] = k # link for k, v in id2id.items(): pred_tracks[pred_tracks[:, 1] == k, 1] = v # deduplicate _, index = np.unique(pred_tracks[:, :2], return_index=True, axis=0) return pred_tracks[index]