Spaces:
Runtime error
Runtime error
# Copyright (c) OpenMMLab. All rights reserved. | |
import math | |
from numbers import Number | |
from typing import List, Optional, Sequence, Tuple, Union | |
import torch | |
import torch.nn.functional as F | |
from mmengine.model import (BaseDataPreprocessor, ImgDataPreprocessor, | |
stack_batch) | |
from mmpretrain.registry import MODELS | |
from mmpretrain.structures import (DataSample, MultiTaskDataSample, | |
batch_label_to_onehot, cat_batch_labels, | |
tensor_split) | |
from .batch_augments import RandomBatchAugment | |
class ClsDataPreprocessor(BaseDataPreprocessor): | |
"""Image pre-processor for classification tasks. | |
Comparing with the :class:`mmengine.model.ImgDataPreprocessor`, | |
1. It won't do normalization if ``mean`` is not specified. | |
2. It does normalization and color space conversion after stacking batch. | |
3. It supports batch augmentations like mixup and cutmix. | |
It provides the data pre-processing as follows | |
- Collate and move data to the target device. | |
- Pad inputs to the maximum size of current batch with defined | |
``pad_value``. The padding size can be divisible by a defined | |
``pad_size_divisor`` | |
- Stack inputs to batch_inputs. | |
- Convert inputs from bgr to rgb if the shape of input is (3, H, W). | |
- Normalize image with defined std and mean. | |
- Do batch augmentations like Mixup and Cutmix during training. | |
Args: | |
mean (Sequence[Number], optional): The pixel mean of R, G, B channels. | |
Defaults to None. | |
std (Sequence[Number], optional): The pixel standard deviation of | |
R, G, B channels. Defaults to None. | |
pad_size_divisor (int): The size of padded image should be | |
divisible by ``pad_size_divisor``. Defaults to 1. | |
pad_value (Number): The padded pixel value. Defaults to 0. | |
to_rgb (bool): whether to convert image from BGR to RGB. | |
Defaults to False. | |
to_onehot (bool): Whether to generate one-hot format gt-labels and set | |
to data samples. Defaults to False. | |
num_classes (int, optional): The number of classes. Defaults to None. | |
batch_augments (dict, optional): The batch augmentations settings, | |
including "augments" and "probs". For more details, see | |
:class:`mmpretrain.models.RandomBatchAugment`. | |
""" | |
def __init__(self, | |
mean: Sequence[Number] = None, | |
std: Sequence[Number] = None, | |
pad_size_divisor: int = 1, | |
pad_value: Number = 0, | |
to_rgb: bool = False, | |
to_onehot: bool = False, | |
num_classes: Optional[int] = None, | |
batch_augments: Optional[dict] = None): | |
super().__init__() | |
self.pad_size_divisor = pad_size_divisor | |
self.pad_value = pad_value | |
self.to_rgb = to_rgb | |
self.to_onehot = to_onehot | |
self.num_classes = num_classes | |
if mean is not None: | |
assert std is not None, 'To enable the normalization in ' \ | |
'preprocessing, please specify both `mean` and `std`.' | |
# Enable the normalization in preprocessing. | |
self._enable_normalize = True | |
self.register_buffer('mean', | |
torch.tensor(mean).view(-1, 1, 1), False) | |
self.register_buffer('std', | |
torch.tensor(std).view(-1, 1, 1), False) | |
else: | |
self._enable_normalize = False | |
if batch_augments: | |
self.batch_augments = RandomBatchAugment(**batch_augments) | |
if not self.to_onehot: | |
from mmengine.logging import MMLogger | |
MMLogger.get_current_instance().info( | |
'Because batch augmentations are enabled, the data ' | |
'preprocessor automatically enables the `to_onehot` ' | |
'option to generate one-hot format labels.') | |
self.to_onehot = True | |
else: | |
self.batch_augments = None | |
def forward(self, data: dict, training: bool = False) -> dict: | |
"""Perform normalization, padding, bgr2rgb conversion and batch | |
augmentation based on ``BaseDataPreprocessor``. | |
Args: | |
data (dict): data sampled from dataloader. | |
training (bool): Whether to enable training time augmentation. | |
Returns: | |
dict: Data in the same format as the model input. | |
""" | |
inputs = self.cast_data(data['inputs']) | |
if isinstance(inputs, torch.Tensor): | |
# The branch if use `default_collate` as the collate_fn in the | |
# dataloader. | |
# ------ To RGB ------ | |
if self.to_rgb and inputs.size(1) == 3: | |
inputs = inputs.flip(1) | |
# -- Normalization --- | |
inputs = inputs.float() | |
if self._enable_normalize: | |
inputs = (inputs - self.mean) / self.std | |
# ------ Padding ----- | |
if self.pad_size_divisor > 1: | |
h, w = inputs.shape[-2:] | |
target_h = math.ceil( | |
h / self.pad_size_divisor) * self.pad_size_divisor | |
target_w = math.ceil( | |
w / self.pad_size_divisor) * self.pad_size_divisor | |
pad_h = target_h - h | |
pad_w = target_w - w | |
inputs = F.pad(inputs, (0, pad_w, 0, pad_h), 'constant', | |
self.pad_value) | |
else: | |
# The branch if use `pseudo_collate` as the collate_fn in the | |
# dataloader. | |
processed_inputs = [] | |
for input_ in inputs: | |
# ------ To RGB ------ | |
if self.to_rgb and input_.size(0) == 3: | |
input_ = input_.flip(0) | |
# -- Normalization --- | |
input_ = input_.float() | |
if self._enable_normalize: | |
input_ = (input_ - self.mean) / self.std | |
processed_inputs.append(input_) | |
# Combine padding and stack | |
inputs = stack_batch(processed_inputs, self.pad_size_divisor, | |
self.pad_value) | |
data_samples = data.get('data_samples', None) | |
sample_item = data_samples[0] if data_samples is not None else None | |
if isinstance(sample_item, DataSample): | |
batch_label = None | |
batch_score = None | |
if 'gt_label' in sample_item: | |
gt_labels = [sample.gt_label for sample in data_samples] | |
batch_label, label_indices = cat_batch_labels(gt_labels) | |
batch_label = batch_label.to(self.device) | |
if 'gt_score' in sample_item: | |
gt_scores = [sample.gt_score for sample in data_samples] | |
batch_score = torch.stack(gt_scores).to(self.device) | |
elif self.to_onehot and 'gt_label' in sample_item: | |
assert batch_label is not None, \ | |
'Cannot generate onehot format labels because no labels.' | |
num_classes = self.num_classes or sample_item.get( | |
'num_classes') | |
assert num_classes is not None, \ | |
'Cannot generate one-hot format labels because not set ' \ | |
'`num_classes` in `data_preprocessor`.' | |
batch_score = batch_label_to_onehot( | |
batch_label, label_indices, num_classes).to(self.device) | |
# ----- Batch Augmentations ---- | |
if (training and self.batch_augments is not None | |
and batch_score is not None): | |
inputs, batch_score = self.batch_augments(inputs, batch_score) | |
# ----- scatter labels and scores to data samples --- | |
if batch_label is not None: | |
for sample, label in zip( | |
data_samples, tensor_split(batch_label, | |
label_indices)): | |
sample.set_gt_label(label) | |
if batch_score is not None: | |
for sample, score in zip(data_samples, batch_score): | |
sample.set_gt_score(score) | |
elif isinstance(sample_item, MultiTaskDataSample): | |
data_samples = self.cast_data(data_samples) | |
return {'inputs': inputs, 'data_samples': data_samples} | |
class SelfSupDataPreprocessor(ImgDataPreprocessor): | |
"""Image pre-processor for operations, like normalization and bgr to rgb. | |
Compared with the :class:`mmengine.ImgDataPreprocessor`, this module | |
supports ``inputs`` as torch.Tensor or a list of torch.Tensor. | |
""" | |
def __init__(self, | |
mean: Optional[Sequence[Union[float, int]]] = None, | |
std: Optional[Sequence[Union[float, int]]] = None, | |
pad_size_divisor: int = 1, | |
pad_value: Union[float, int] = 0, | |
to_rgb: bool = False, | |
bgr_to_rgb: bool = False, | |
rgb_to_bgr: bool = False, | |
non_blocking: Optional[bool] = False): | |
super().__init__( | |
mean=mean, | |
std=std, | |
pad_size_divisor=pad_size_divisor, | |
pad_value=pad_value, | |
bgr_to_rgb=bgr_to_rgb, | |
rgb_to_bgr=rgb_to_bgr, | |
non_blocking=non_blocking) | |
self._channel_conversion = to_rgb or bgr_to_rgb or rgb_to_bgr | |
def forward( | |
self, | |
data: dict, | |
training: bool = False | |
) -> Tuple[List[torch.Tensor], Optional[list]]: | |
"""Performs normalization and bgr2rgb conversion based on | |
``BaseDataPreprocessor``. | |
Args: | |
data (dict): data sampled from dataloader. | |
training (bool): Whether to enable training time augmentation. If | |
subclasses override this method, they can perform different | |
preprocessing strategies for training and testing based on the | |
value of ``training``. | |
Returns: | |
Tuple[torch.Tensor, Optional[list]]: Data in the same format as the | |
model input. | |
""" | |
assert isinstance(data, | |
dict), 'Please use default_collate in dataloader, \ | |
instead of pseudo_collate.' | |
data = [val for _, val in data.items()] | |
batch_inputs, batch_data_samples = self.cast_data(data) | |
# Here is what is different from :class:`mmengine.ImgDataPreprocessor` | |
# Since there are multiple views for an image for some algorithms, | |
# e.g. SimCLR, each item in inputs is a list, containing multi-views | |
# for an image. | |
if isinstance(batch_inputs, list): | |
# channel transform | |
if self._channel_conversion: | |
batch_inputs = [ | |
_input[:, [2, 1, 0], ...] for _input in batch_inputs | |
] | |
# convert to float after channel conversion to ensure efficiency | |
batch_inputs = [_input.float() for _input in batch_inputs] | |
# normalization. | |
if self._enable_normalize: | |
batch_inputs = [(_input - self.mean) / self.std | |
for _input in batch_inputs] | |
else: | |
# channel transform | |
if self._channel_conversion: | |
batch_inputs = batch_inputs[:, [2, 1, 0], ...] | |
# convert to float after channel conversion to ensure efficiency | |
batch_inputs = batch_inputs.float() | |
# normalization. | |
if self._enable_normalize: | |
batch_inputs = (batch_inputs - self.mean) / self.std | |
return {'inputs': batch_inputs, 'data_samples': batch_data_samples} | |
class TwoNormDataPreprocessor(SelfSupDataPreprocessor): | |
"""Image pre-processor for CAE, BEiT v1/v2, etc. | |
Compared with the :class:`mmselfsup.SelfSupDataPreprocessor`, this module | |
will normalize the prediction image and target image with different | |
normalization parameters. | |
Args: | |
mean (Sequence[float or int], optional): The pixel mean of image | |
channels. If ``to_rgb=True`` it means the mean value of R, G, B | |
channels. If the length of `mean` is 1, it means all channels have | |
the same mean value, or the input is a gray image. If it is not | |
specified, images will not be normalized. Defaults to None. | |
std (Sequence[float or int], optional): The pixel standard deviation of | |
image channels. If ``to_rgb=True`` it means the standard deviation | |
of R, G, B channels. If the length of `std` is 1, it means all | |
channels have the same standard deviation, or the input is a gray | |
image. If it is not specified, images will not be normalized. | |
Defaults to None. | |
second_mean (Sequence[float or int], optional): The description is | |
like ``mean``, it can be customized for targe image. Defaults to | |
None. | |
second_std (Sequence[float or int], optional): The description is | |
like ``std``, it can be customized for targe image. Defaults to | |
None. | |
pad_size_divisor (int): The size of padded image should be | |
divisible by ``pad_size_divisor``. Defaults to 1. | |
pad_value (float or int): The padded pixel value. Defaults to 0. | |
to_rgb (bool): whether to convert image from BGR to RGB. | |
Defaults to False. | |
non_blocking (bool): Whether block current process when transferring | |
data to device. Defaults to False. | |
""" | |
def __init__(self, | |
mean: Optional[Sequence[Union[float, int]]] = None, | |
std: Optional[Sequence[Union[float, int]]] = None, | |
second_mean: Sequence[Union[float, int]] = None, | |
second_std: Sequence[Union[float, int]] = None, | |
pad_size_divisor: int = 1, | |
pad_value: Union[float, int] = 0, | |
to_rgb: bool = False, | |
non_blocking: Optional[bool] = False): | |
super().__init__( | |
mean=mean, | |
std=std, | |
pad_size_divisor=pad_size_divisor, | |
pad_value=pad_value, | |
to_rgb=to_rgb, | |
non_blocking=non_blocking) | |
assert (second_mean is not None) and (second_std is not None), ( | |
'mean and std should not be None while using ' | |
'`TwoNormDataPreprocessor`') | |
assert len(second_mean) == 3 or len(second_mean) == 1, ( | |
'`mean` should have 1 or 3 values, to be compatible with ' | |
f'RGB or gray image, but got {len(second_mean)} values') | |
assert len(second_std) == 3 or len(second_std) == 1, ( | |
'`std` should have 1 or 3 values, to be compatible with RGB ' | |
f'or gray image, but got {len(std)} values') | |
self.register_buffer('second_mean', | |
torch.tensor(second_mean).view(-1, 1, 1), False) | |
self.register_buffer('second_std', | |
torch.tensor(second_std).view(-1, 1, 1), False) | |
def forward( | |
self, | |
data: dict, | |
training: bool = False | |
) -> Tuple[List[torch.Tensor], Optional[list]]: | |
"""Performs normalization and bgr2rgb conversion based on | |
``BaseDataPreprocessor``. The ``batch_inputs`` in forward function is a | |
list. | |
Args: | |
data (dict): data sampled from dataloader. | |
training (bool): Whether to enable training time augmentation. If | |
subclasses override this method, they can perform different | |
preprocessing strategies for training and testing based on the | |
value of ``training``. | |
Returns: | |
Tuple[torch.Tensor, Optional[list]]: Data in the same format as the | |
model input. | |
""" | |
data = [val for _, val in data.items()] | |
batch_inputs, batch_data_samples = self.cast_data(data) | |
# channel transform | |
if self._channel_conversion: | |
batch_inputs = [ | |
_input[:, [2, 1, 0], ...] for _input in batch_inputs | |
] | |
# convert to float after channel conversion to ensure efficiency | |
batch_inputs = [_input.float() for _input in batch_inputs] | |
# Normalization. Here is what is different from | |
# :class:`mmselfsup.SelfSupDataPreprocessor`. Normalize the target | |
# image and prediction image with different normalization params | |
if self._enable_normalize: | |
batch_inputs = [ | |
(batch_inputs[0] - self.mean) / self.std, | |
(batch_inputs[1] - self.second_mean) / self.second_std | |
] | |
return {'inputs': batch_inputs, 'data_samples': batch_data_samples} | |
class VideoDataPreprocessor(BaseDataPreprocessor): | |
"""Video pre-processor for operations, like normalization and bgr to rgb | |
conversion . | |
Compared with the :class:`mmaction.ActionDataPreprocessor`, this module | |
supports ``inputs`` as torch.Tensor or a list of torch.Tensor. | |
Args: | |
mean (Sequence[float or int, optional): The pixel mean of channels | |
of images or stacked optical flow. Defaults to None. | |
std (Sequence[float or int], optional): The pixel standard deviation | |
of channels of images or stacked optical flow. Defaults to None. | |
pad_size_divisor (int): The size of padded image should be | |
divisible by ``pad_size_divisor``. Defaults to 1. | |
pad_value (float or int): The padded pixel value. Defaults to 0. | |
to_rgb (bool): Whether to convert image from BGR to RGB. | |
Defaults to False. | |
format_shape (str): Format shape of input data. | |
Defaults to ``'NCHW'``. | |
""" | |
def __init__(self, | |
mean: Optional[Sequence[Union[float, int]]] = None, | |
std: Optional[Sequence[Union[float, int]]] = None, | |
pad_size_divisor: int = 1, | |
pad_value: Union[float, int] = 0, | |
to_rgb: bool = False, | |
format_shape: str = 'NCHW') -> None: | |
super().__init__() | |
self.pad_size_divisor = pad_size_divisor | |
self.pad_value = pad_value | |
self.to_rgb = to_rgb | |
self.format_shape = format_shape | |
if mean is not None: | |
assert std is not None, 'To enable the normalization in ' \ | |
'preprocessing, please specify both ' \ | |
'`mean` and `std`.' | |
# Enable the normalization in preprocessing. | |
self._enable_normalize = True | |
if self.format_shape == 'NCHW': | |
normalizer_shape = (-1, 1, 1) | |
elif self.format_shape == 'NCTHW': | |
normalizer_shape = (-1, 1, 1, 1) | |
else: | |
raise ValueError(f'Invalid format shape: {format_shape}') | |
self.register_buffer( | |
'mean', | |
torch.tensor(mean, dtype=torch.float32).view(normalizer_shape), | |
False) | |
self.register_buffer( | |
'std', | |
torch.tensor(std, dtype=torch.float32).view(normalizer_shape), | |
False) | |
else: | |
self._enable_normalize = False | |
def forward( | |
self, | |
data: dict, | |
training: bool = False | |
) -> Tuple[List[torch.Tensor], Optional[list]]: | |
"""Performs normalizationγpadding and bgr2rgb conversion based on | |
``BaseDataPreprocessor``. | |
Args: | |
data (dict): data sampled from dataloader. | |
training (bool): Whether to enable training time augmentation. If | |
subclasses override this method, they can perform different | |
preprocessing strategies for training and testing based on the | |
value of ``training``. | |
Returns: | |
Tuple[List[torch.Tensor], Optional[list]]: Data in the same format | |
as the model input. | |
""" | |
data = [val for _, val in data.items()] | |
batch_inputs, batch_data_samples = self.cast_data(data) | |
if isinstance(batch_inputs, list): | |
# channel transform | |
if self.to_rgb: | |
if self.format_shape == 'NCHW': | |
batch_inputs = [ | |
_input[..., [2, 1, 0], :, :] for _input in batch_inputs | |
] | |
elif self.format_shape == 'NCTHW': | |
batch_inputs = [ | |
_input[..., [2, 1, 0], :, :, :] | |
for _input in batch_inputs | |
] | |
else: | |
raise ValueError( | |
f'Invalid format shape: {self.format_shape}') | |
# convert to float after channel conversion to ensure efficiency | |
batch_inputs = [_input.float() for _input in batch_inputs] | |
# normalization | |
if self._enable_normalize: | |
batch_inputs = [(_input - self.mean) / self.std | |
for _input in batch_inputs] | |
else: | |
# channel transform | |
if self.to_rgb: | |
if self.format_shape == 'NCHW': | |
batch_inputs = batch_inputs[..., [2, 1, 0], :, :] | |
elif self.format_shape == 'NCTHW': | |
batch_inputs = batch_inputs[..., [2, 1, 0], :, :, :] | |
else: | |
raise ValueError( | |
f'Invalid format shape: {self.format_shape}') | |
# convert to float after channel conversion to ensure efficiency | |
batch_inputs = batch_inputs.float() | |
# normalization | |
if self._enable_normalize: | |
batch_inputs = (batch_inputs - self.mean) / self.std | |
return {'inputs': batch_inputs, 'data_samples': batch_data_samples} | |
class MultiModalDataPreprocessor(BaseDataPreprocessor): | |
"""Data pre-processor for image-text multimodality tasks. | |
It provides the data pre-processing as follows | |
- Collate and move data to the target device. | |
- Pad inputs to the maximum size of current batch with defined | |
``pad_value``. The padding size can be divisible by a defined | |
``pad_size_divisor`` | |
- Stack inputs to batch_inputs. | |
- Convert inputs from bgr to rgb if the shape of input is (3, H, W). | |
- Normalize image with defined std and mean. | |
Args: | |
mean (Sequence[Number], optional): The pixel mean of R, G, B channels. | |
Defaults to None. | |
std (Sequence[Number], optional): The pixel standard deviation of | |
R, G, B channels. Defaults to None. | |
pad_size_divisor (int): The size of padded image should be | |
divisible by ``pad_size_divisor``. Defaults to 1. | |
pad_value (Number): The padded pixel value. Defaults to 0. | |
to_rgb (bool): whether to convert image from BGR to RGB. | |
Defaults to False. | |
""" | |
def __init__( | |
self, | |
mean: Sequence[Number] = None, | |
std: Sequence[Number] = None, | |
pad_size_divisor: int = 1, | |
pad_value: Number = 0, | |
to_rgb: bool = False, | |
): | |
super().__init__() | |
self.pad_size_divisor = pad_size_divisor | |
self.pad_value = pad_value | |
self.to_rgb = to_rgb | |
if mean is not None: | |
assert std is not None, 'To enable the normalization in ' \ | |
'preprocessing, please specify both `mean` and `std`.' | |
# Enable the normalization in preprocessing. | |
self._enable_normalize = True | |
self.register_buffer('mean', | |
torch.tensor(mean).view(-1, 1, 1), False) | |
self.register_buffer('std', | |
torch.tensor(std).view(-1, 1, 1), False) | |
else: | |
self._enable_normalize = False | |
def forward(self, data: dict, training: bool = False) -> dict: | |
"""Perform normalization, padding, bgr2rgb conversion and batch | |
augmentation based on ``BaseDataPreprocessor``. | |
Args: | |
data (dict): data sampled from dataloader. | |
training (bool): Whether to enable training time augmentation. | |
Returns: | |
dict: Data in the same format as the model input. | |
""" | |
data = self.cast_data(data) | |
imgs = data.get('inputs', None) | |
def _process_img(img): | |
# ------ To RGB ------ | |
if self.to_rgb and img.size(1) == 3: | |
img = img.flip(1) | |
# -- Normalization --- | |
img = img.float() | |
if self._enable_normalize: | |
img = (img - self.mean) / self.std | |
# ------ Padding ----- | |
if self.pad_size_divisor > 1: | |
h, w = img.shape[-2:] | |
target_h = math.ceil( | |
h / self.pad_size_divisor) * self.pad_size_divisor | |
target_w = math.ceil( | |
w / self.pad_size_divisor) * self.pad_size_divisor | |
pad_h = target_h - h | |
pad_w = target_w - w | |
img = F.pad(img, (0, pad_w, 0, pad_h), 'constant', | |
self.pad_value) | |
return img | |
if isinstance(imgs, torch.Tensor): | |
imgs = _process_img(imgs) | |
elif isinstance(imgs, Sequence): | |
# B, T, C, H, W | |
imgs = torch.stack([_process_img(img) for img in imgs], dim=1) | |
elif imgs is not None: | |
raise ValueError(f'{type(imgs)} is not supported for imgs inputs.') | |
data_samples = data.get('data_samples', None) | |
return {'images': imgs, 'data_samples': data_samples} | |