Spaces:
Runtime error
Runtime error
# coding=utf-8 | |
# Copyright 2021 The HuggingFace Inc. team. All rights reserved. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
""" Testing suite for the PyTorch Perceiver model. """ | |
import copy | |
import inspect | |
import math | |
import tempfile | |
import unittest | |
import warnings | |
from typing import Dict, List, Tuple | |
import numpy as np | |
from datasets import load_dataset | |
from transformers import PerceiverConfig | |
from transformers.models.auto import get_values | |
from transformers.testing_utils import require_torch, require_torch_multi_gpu, require_vision, slow, torch_device | |
from transformers.utils import is_torch_available, is_vision_available | |
from ...test_configuration_common import ConfigTester | |
from ...test_modeling_common import ModelTesterMixin, floats_tensor, ids_tensor, random_attention_mask | |
from ...test_pipeline_mixin import PipelineTesterMixin | |
if is_torch_available(): | |
import torch | |
from torch import nn | |
from transformers import ( | |
MODEL_FOR_IMAGE_CLASSIFICATION_MAPPING, | |
MODEL_FOR_MASKED_LM_MAPPING, | |
MODEL_FOR_SEQUENCE_CLASSIFICATION_MAPPING, | |
MODEL_FOR_TOKEN_CLASSIFICATION_MAPPING, | |
MODEL_MAPPING, | |
PerceiverForImageClassificationConvProcessing, | |
PerceiverForImageClassificationFourier, | |
PerceiverForImageClassificationLearned, | |
PerceiverForMaskedLM, | |
PerceiverForMultimodalAutoencoding, | |
PerceiverForOpticalFlow, | |
PerceiverForSequenceClassification, | |
PerceiverModel, | |
PerceiverTokenizer, | |
) | |
from transformers.models.perceiver.modeling_perceiver import PERCEIVER_PRETRAINED_MODEL_ARCHIVE_LIST | |
if is_vision_available(): | |
from PIL import Image | |
from transformers import PerceiverFeatureExtractor | |
class PerceiverModelTester: | |
def __init__( | |
self, | |
parent, | |
batch_size=13, | |
seq_length=7, | |
num_channels=3, | |
image_size=32, | |
train_size=[20, 20], | |
num_frames=5, | |
audio_samples_per_frame=200, | |
samples_per_patch=20, | |
nchunks=20, | |
num_latents=10, | |
d_latents=20, | |
num_blocks=1, | |
num_self_attends_per_block=2, | |
num_self_attention_heads=1, | |
num_cross_attention_heads=1, | |
self_attention_widening_factor=4, | |
cross_attention_widening_factor=4, | |
is_training=True, | |
use_input_mask=True, | |
use_labels=True, | |
vocab_size=99, | |
hidden_act="gelu", | |
attention_probs_dropout_prob=0.1, | |
initializer_range=0.02, | |
max_position_embeddings=7, | |
num_labels=3, | |
scope=None, | |
): | |
self.parent = parent | |
self.batch_size = batch_size | |
self.seq_length = seq_length | |
self.num_channels = num_channels | |
self.image_size = image_size | |
self.train_size = train_size | |
self.num_frames = num_frames | |
self.audio_samples_per_frame = audio_samples_per_frame | |
self.samples_per_patch = samples_per_patch | |
self.nchunks = nchunks | |
self.num_latents = num_latents | |
self.d_latents = d_latents | |
self.num_blocks = num_blocks | |
self.num_self_attends_per_block = num_self_attends_per_block | |
self.num_self_attention_heads = num_self_attention_heads | |
self.num_cross_attention_heads = num_cross_attention_heads | |
self.self_attention_widening_factor = self_attention_widening_factor | |
self.cross_attention_widening_factor = cross_attention_widening_factor | |
self.is_training = is_training | |
self.use_input_mask = use_input_mask | |
self.use_labels = use_labels | |
self.vocab_size = vocab_size | |
self.hidden_act = hidden_act | |
self.attention_probs_dropout_prob = attention_probs_dropout_prob | |
self.max_position_embeddings = max_position_embeddings | |
self.initializer_range = initializer_range | |
self.num_labels = num_labels | |
self.scope = scope | |
# set subsampling for multimodal model (take first chunk) | |
image_chunk_size = np.prod((self.num_frames, self.image_size, self.image_size)) // self.nchunks | |
audio_chunk_size = self.num_frames * self.audio_samples_per_frame // self.samples_per_patch // self.nchunks | |
self.subsampling = { | |
"image": torch.arange(0, image_chunk_size), | |
"audio": torch.arange(0, audio_chunk_size), | |
"label": None, | |
} | |
def prepare_config_and_inputs(self, model_class=None): | |
config = self.get_config() | |
input_mask = None | |
sequence_labels = None | |
token_labels = None | |
if self.use_labels: | |
sequence_labels = ids_tensor([self.batch_size], self.num_labels) | |
token_labels = ids_tensor([self.batch_size, self.seq_length], self.num_labels) | |
if model_class is None or model_class.__name__ == "PerceiverModel": | |
inputs = floats_tensor([self.batch_size, self.seq_length, config.d_model], scale=1.0) | |
return config, inputs, input_mask, sequence_labels, token_labels | |
elif model_class.__name__ in ["PerceiverForMaskedLM", "PerceiverForSequenceClassification"]: | |
inputs = ids_tensor([self.batch_size, self.seq_length], self.vocab_size) | |
# input mask is only relevant for text inputs | |
if self.use_input_mask: | |
input_mask = random_attention_mask([self.batch_size, self.seq_length]) | |
elif model_class.__name__ == "PerceiverForImageClassificationLearned": | |
inputs = floats_tensor([self.batch_size, self.num_channels, self.image_size, self.image_size]) | |
elif model_class.__name__ == "PerceiverForImageClassificationFourier": | |
inputs = floats_tensor([self.batch_size, self.num_channels, self.image_size, self.image_size]) | |
elif model_class.__name__ == "PerceiverForImageClassificationConvProcessing": | |
inputs = floats_tensor([self.batch_size, self.num_channels, self.image_size, self.image_size]) | |
elif model_class.__name__ == "PerceiverForOpticalFlow": | |
inputs = floats_tensor([self.batch_size, 2, 27, self.train_size[0], self.train_size[1]]) | |
elif model_class.__name__ == "PerceiverForMultimodalAutoencoding": | |
images = torch.randn( | |
(self.batch_size, self.num_frames, self.num_channels, self.image_size, self.image_size), | |
device=torch_device, | |
) | |
audio = torch.randn( | |
(self.batch_size, self.num_frames * self.audio_samples_per_frame, 1), device=torch_device | |
) | |
inputs = { | |
"image": images, | |
"audio": audio, | |
"label": torch.zeros((self.batch_size, self.num_labels), device=torch_device), | |
} | |
else: | |
raise ValueError(f"Model class {model_class} not supported") | |
return config, inputs, input_mask, sequence_labels, token_labels | |
def get_config(self): | |
return PerceiverConfig( | |
num_latents=self.num_latents, | |
d_latents=self.d_latents, | |
qk_channels=self.d_latents, | |
v_channels=self.d_latents, | |
num_blocks=self.num_blocks, | |
num_self_attends_per_block=self.num_self_attends_per_block, | |
num_self_attention_heads=self.num_self_attention_heads, | |
num_cross_attention_heads=self.num_cross_attention_heads, | |
self_attention_widening_factor=self.self_attention_widening_factor, | |
cross_attention_widening_factor=self.cross_attention_widening_factor, | |
vocab_size=self.vocab_size, | |
hidden_act=self.hidden_act, | |
attention_probs_dropout_prob=self.attention_probs_dropout_prob, | |
initializer_range=self.initializer_range, | |
max_position_embeddings=self.max_position_embeddings, | |
image_size=self.image_size, | |
train_size=self.train_size, | |
num_frames=self.num_frames, | |
audio_samples_per_frame=self.audio_samples_per_frame, | |
samples_per_patch=self.samples_per_patch, | |
num_labels=self.num_labels, | |
) | |
def get_pipeline_config(self): | |
config = self.get_config() | |
# Byte level vocab | |
config.vocab_size = 261 | |
config.max_position_embeddings = 40 | |
return config | |
def create_and_check_for_masked_lm(self, config, inputs, input_mask, sequence_labels, token_labels): | |
model = PerceiverForMaskedLM(config=config) | |
model.to(torch_device) | |
model.eval() | |
result = model(inputs, attention_mask=input_mask, labels=token_labels) | |
self.parent.assertEqual(result.logits.shape, (self.batch_size, self.seq_length, self.vocab_size)) | |
def create_and_check_for_sequence_classification(self, config, inputs, input_mask, sequence_labels, token_labels): | |
model = PerceiverForSequenceClassification(config=config) | |
model.to(torch_device) | |
model.eval() | |
result = model(inputs, attention_mask=input_mask, labels=sequence_labels) | |
self.parent.assertEqual(result.logits.shape, (self.batch_size, self.num_labels)) | |
def create_and_check_for_image_classification_learned( | |
self, config, inputs, input_mask, sequence_labels, token_labels | |
): | |
model = PerceiverForImageClassificationLearned(config=config) | |
model.to(torch_device) | |
model.eval() | |
result = model(inputs, attention_mask=input_mask, labels=sequence_labels) | |
self.parent.assertEqual(result.logits.shape, (self.batch_size, self.num_labels)) | |
def create_and_check_for_image_classification_fourier( | |
self, config, inputs, input_mask, sequence_labels, token_labels | |
): | |
model = PerceiverForImageClassificationFourier(config=config) | |
model.to(torch_device) | |
model.eval() | |
result = model(inputs, attention_mask=input_mask, labels=sequence_labels) | |
self.parent.assertEqual(result.logits.shape, (self.batch_size, self.num_labels)) | |
def create_and_check_for_image_classification_conv( | |
self, config, inputs, input_mask, sequence_labels, token_labels | |
): | |
model = PerceiverForImageClassificationConvProcessing(config=config) | |
model.to(torch_device) | |
model.eval() | |
result = model(inputs, attention_mask=input_mask, labels=sequence_labels) | |
self.parent.assertEqual(result.logits.shape, (self.batch_size, self.num_labels)) | |
def prepare_config_and_inputs_for_common(self): | |
config_and_inputs = self.prepare_config_and_inputs() | |
config, inputs, input_mask, sequence_labels, token_labels = config_and_inputs | |
inputs_dict = {"inputs": inputs, "attention_mask": input_mask} | |
return config, inputs_dict | |
def prepare_config_and_inputs_for_model_class(self, model_class): | |
config_and_inputs = self.prepare_config_and_inputs(model_class) | |
config, inputs, input_mask, sequence_labels, token_labels = config_and_inputs | |
inputs_dict = {"inputs": inputs, "attention_mask": input_mask} | |
return config, inputs_dict | |
class PerceiverModelTest(ModelTesterMixin, PipelineTesterMixin, unittest.TestCase): | |
all_model_classes = ( | |
( | |
PerceiverModel, | |
PerceiverForMaskedLM, | |
PerceiverForImageClassificationLearned, | |
PerceiverForImageClassificationConvProcessing, | |
PerceiverForImageClassificationFourier, | |
PerceiverForOpticalFlow, | |
PerceiverForMultimodalAutoencoding, | |
PerceiverForSequenceClassification, | |
) | |
if is_torch_available() | |
else () | |
) | |
pipeline_model_mapping = ( | |
{ | |
"feature-extraction": PerceiverModel, | |
"fill-mask": PerceiverForMaskedLM, | |
"image-classification": ( | |
PerceiverForImageClassificationConvProcessing, | |
PerceiverForImageClassificationFourier, | |
PerceiverForImageClassificationLearned, | |
), | |
"text-classification": PerceiverForSequenceClassification, | |
"zero-shot": PerceiverForSequenceClassification, | |
} | |
if is_torch_available() | |
else {} | |
) | |
test_pruning = False | |
test_head_masking = False | |
test_torchscript = False | |
maxDiff = None | |
def setUp(self): | |
self.model_tester = PerceiverModelTester(self) | |
self.config_tester = ConfigTester(self, config_class=PerceiverConfig, hidden_size=37) | |
def _prepare_for_class(self, inputs_dict, model_class, return_labels=False): | |
inputs_dict = copy.deepcopy(inputs_dict) | |
if model_class.__name__ == "PerceiverForMultimodalAutoencoding": | |
inputs_dict["subsampled_output_points"] = self.model_tester.subsampling | |
if return_labels: | |
if model_class in [ | |
*get_values(MODEL_FOR_SEQUENCE_CLASSIFICATION_MAPPING), | |
*get_values(MODEL_FOR_IMAGE_CLASSIFICATION_MAPPING), | |
]: | |
inputs_dict["labels"] = torch.zeros( | |
self.model_tester.batch_size, dtype=torch.long, device=torch_device | |
) | |
elif model_class in [ | |
*get_values(MODEL_FOR_TOKEN_CLASSIFICATION_MAPPING), | |
*get_values(MODEL_FOR_MASKED_LM_MAPPING), | |
]: | |
inputs_dict["labels"] = torch.zeros( | |
(self.model_tester.batch_size, self.model_tester.seq_length), dtype=torch.long, device=torch_device | |
) | |
return inputs_dict | |
def test_config(self): | |
# we don't test common_properties and arguments_init as these don't apply for Perceiver | |
self.config_tester.create_and_test_config_to_json_string() | |
self.config_tester.create_and_test_config_to_json_file() | |
self.config_tester.create_and_test_config_from_and_save_pretrained() | |
self.config_tester.create_and_test_config_with_num_labels() | |
self.config_tester.check_config_can_be_init_without_params() | |
def test_for_masked_lm(self): | |
config_and_inputs = self.model_tester.prepare_config_and_inputs(model_class=PerceiverForMaskedLM) | |
self.model_tester.create_and_check_for_masked_lm(*config_and_inputs) | |
def test_for_sequence_classification(self): | |
config_and_inputs = self.model_tester.prepare_config_and_inputs(model_class=PerceiverForSequenceClassification) | |
self.model_tester.create_and_check_for_sequence_classification(*config_and_inputs) | |
def test_for_image_classification_learned(self): | |
config_and_inputs = self.model_tester.prepare_config_and_inputs( | |
model_class=PerceiverForImageClassificationLearned | |
) | |
self.model_tester.create_and_check_for_image_classification_learned(*config_and_inputs) | |
def test_for_image_classification_fourier(self): | |
config_and_inputs = self.model_tester.prepare_config_and_inputs( | |
model_class=PerceiverForImageClassificationFourier | |
) | |
self.model_tester.create_and_check_for_image_classification_fourier(*config_and_inputs) | |
def test_for_image_classification_conv(self): | |
config_and_inputs = self.model_tester.prepare_config_and_inputs( | |
model_class=PerceiverForImageClassificationConvProcessing | |
) | |
self.model_tester.create_and_check_for_image_classification_conv(*config_and_inputs) | |
def test_model_common_attributes(self): | |
for model_class in self.all_model_classes: | |
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_model_class(model_class) | |
model = model_class(config) | |
# we overwrite this, as the embeddings of Perceiver are an instance of nn.Parameter | |
# and Perceiver doesn't support get_output_embeddings | |
self.assertIsInstance(model.get_input_embeddings(), (nn.Parameter)) | |
def test_training(self): | |
if not self.model_tester.is_training: | |
return | |
for model_class in self.all_model_classes: | |
if model_class in [ | |
*get_values(MODEL_MAPPING), | |
PerceiverForOpticalFlow, | |
PerceiverForMultimodalAutoencoding, | |
]: | |
continue | |
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_model_class(model_class) | |
config.return_dict = True | |
model = model_class(config) | |
model.to(torch_device) | |
model.train() | |
inputs = self._prepare_for_class(inputs_dict, model_class, return_labels=True) | |
loss = model(**inputs).loss | |
loss.backward() | |
def test_forward_signature(self): | |
for model_class in self.all_model_classes: | |
config, _ = self.model_tester.prepare_config_and_inputs_for_model_class(model_class) | |
model = model_class(config) | |
signature = inspect.signature(model.forward) | |
# signature.parameters is an OrderedDict => so arg_names order is deterministic | |
arg_names = [*signature.parameters.keys()] | |
expected_arg_names = ["inputs"] | |
self.assertListEqual(arg_names[:1], expected_arg_names) | |
def test_determinism(self): | |
for model_class in self.all_model_classes: | |
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_model_class(model_class) | |
model = model_class(config) | |
model.to(torch_device) | |
model.eval() | |
with torch.no_grad(): | |
inputs_dict = self._prepare_for_class(inputs_dict, model_class) | |
first = model(**inputs_dict)[0] | |
second = model(**inputs_dict)[0] | |
if model_class.__name__ == "PerceiverForMultimodalAutoencoding": | |
# model outputs a dictionary with logits per modality, let's verify each modality | |
for modality in first.keys(): | |
out_1 = first[modality].cpu().numpy() | |
out_2 = second[modality].cpu().numpy() | |
out_1 = out_1[~np.isnan(out_1)] | |
out_2 = out_2[~np.isnan(out_2)] | |
max_diff = np.amax(np.abs(out_1 - out_2)) | |
self.assertLessEqual(max_diff, 1e-5) | |
else: | |
out_1 = first.cpu().numpy() | |
out_2 = second.cpu().numpy() | |
out_1 = out_1[~np.isnan(out_1)] | |
out_2 = out_2[~np.isnan(out_2)] | |
max_diff = np.amax(np.abs(out_1 - out_2)) | |
self.assertLessEqual(max_diff, 1e-5) | |
def test_attention_outputs(self): | |
seq_len = getattr(self.model_tester, "num_latents", None) | |
for model_class in self.all_model_classes: | |
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_model_class(model_class) | |
config.return_dict = True | |
inputs_dict["output_attentions"] = True | |
inputs_dict["output_hidden_states"] = False | |
config.return_dict = True | |
model = model_class(config) | |
model.to(torch_device) | |
model.eval() | |
with torch.no_grad(): | |
outputs = model(**self._prepare_for_class(inputs_dict, model_class)) | |
self_attentions = outputs.attentions | |
cross_attentions = outputs.cross_attentions | |
# check expected number of attentions depending on model class | |
expected_num_self_attentions = self.model_tester.num_blocks * self.model_tester.num_self_attends_per_block | |
if model.__class__.__name__ == "PerceiverModel": | |
# we expect to have 2 cross-attentions, namely one in the PerceiverEncoder, and one in PerceiverBasicDecoder | |
expected_num_cross_attentions = 1 | |
else: | |
# we expect to have 2 cross-attentions, namely one in the PerceiverEncoder, and one in PerceiverBasicDecoder | |
expected_num_cross_attentions = 2 | |
self.assertEqual(len(self_attentions), expected_num_self_attentions) | |
self.assertEqual(len(cross_attentions), expected_num_cross_attentions) | |
# check that output_attentions also work using config | |
del inputs_dict["output_attentions"] | |
config.output_attentions = True | |
model = model_class(config) | |
model.to(torch_device) | |
model.eval() | |
with torch.no_grad(): | |
outputs = model(**self._prepare_for_class(inputs_dict, model_class)) | |
self_attentions = outputs.attentions | |
cross_attentions = outputs.cross_attentions | |
self.assertEqual(len(self_attentions), expected_num_self_attentions) | |
self.assertEqual(len(cross_attentions), expected_num_cross_attentions) | |
self.assertListEqual( | |
list(self_attentions[0].shape[-3:]), | |
[self.model_tester.num_self_attention_heads, seq_len, seq_len], | |
) | |
out_len = len(outputs) | |
# Check attention is always last and order is fine | |
inputs_dict["output_attentions"] = True | |
inputs_dict["output_hidden_states"] = True | |
model = model_class(config) | |
model.to(torch_device) | |
model.eval() | |
with torch.no_grad(): | |
outputs = model(**self._prepare_for_class(inputs_dict, model_class)) | |
self.assertEqual(out_len + 1, len(outputs)) | |
self_attentions = outputs.attentions | |
self.assertEqual(len(self_attentions), expected_num_self_attentions) | |
self.assertListEqual( | |
list(self_attentions[0].shape[-3:]), | |
[self.model_tester.num_self_attention_heads, seq_len, seq_len], | |
) | |
def test_hidden_states_output(self): | |
def check_hidden_states_output(inputs_dict, config, model_class): | |
model = model_class(config) | |
model.to(torch_device) | |
model.eval() | |
with torch.no_grad(): | |
outputs = model(**self._prepare_for_class(inputs_dict, model_class)) | |
hidden_states = outputs.hidden_states | |
expected_num_layers = self.model_tester.num_blocks * self.model_tester.num_self_attends_per_block + 1 | |
self.assertEqual(len(hidden_states), expected_num_layers) | |
seq_length = self.model_tester.num_latents | |
self.assertListEqual( | |
list(hidden_states[0].shape[-2:]), | |
[seq_length, self.model_tester.d_latents], | |
) | |
for model_class in self.all_model_classes: | |
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_model_class(model_class) | |
inputs_dict["output_hidden_states"] = True | |
check_hidden_states_output(inputs_dict, config, model_class) | |
# check that output_hidden_states also work using config | |
del inputs_dict["output_hidden_states"] | |
config.output_hidden_states = True | |
check_hidden_states_output(inputs_dict, config, model_class) | |
def test_model_outputs_equivalence(self): | |
def set_nan_tensor_to_zero(t): | |
t[t != t] = 0 | |
return t | |
def check_equivalence(model, tuple_inputs, dict_inputs, additional_kwargs={}): | |
with torch.no_grad(): | |
tuple_output = model(**tuple_inputs, return_dict=False, **additional_kwargs) | |
dict_output = model(**dict_inputs, return_dict=True, **additional_kwargs).to_tuple() | |
def recursive_check(tuple_object, dict_object): | |
if isinstance(tuple_object, (List, Tuple)): | |
for tuple_iterable_value, dict_iterable_value in zip(tuple_object, dict_object): | |
recursive_check(tuple_iterable_value, dict_iterable_value) | |
elif isinstance(tuple_object, Dict): | |
for tuple_iterable_value, dict_iterable_value in zip( | |
tuple_object.values(), dict_object.values() | |
): | |
recursive_check(tuple_iterable_value, dict_iterable_value) | |
elif tuple_object is None: | |
return | |
else: | |
self.assertTrue( | |
torch.allclose( | |
set_nan_tensor_to_zero(tuple_object), set_nan_tensor_to_zero(dict_object), atol=1e-5 | |
), | |
msg=( | |
"Tuple and dict output are not equal. Difference:" | |
f" {torch.max(torch.abs(tuple_object - dict_object))}. Tuple has `nan`:" | |
f" {torch.isnan(tuple_object).any()} and `inf`: {torch.isinf(tuple_object)}. Dict has" | |
f" `nan`: {torch.isnan(dict_object).any()} and `inf`: {torch.isinf(dict_object)}." | |
), | |
) | |
recursive_check(tuple_output, dict_output) | |
for model_class in self.all_model_classes: | |
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_model_class(model_class) | |
model = model_class(config) | |
model.to(torch_device) | |
model.eval() | |
tuple_inputs = self._prepare_for_class(inputs_dict, model_class) | |
dict_inputs = self._prepare_for_class(inputs_dict, model_class) | |
check_equivalence(model, tuple_inputs, dict_inputs) | |
if model_class.__name__ not in ["PerceiverForOpticalFlow", "PerceiverForMultimodalAutoencoding"]: | |
# optical flow + multimodal models don't support training for now | |
tuple_inputs = self._prepare_for_class(inputs_dict, model_class, return_labels=True) | |
dict_inputs = self._prepare_for_class(inputs_dict, model_class, return_labels=True) | |
check_equivalence(model, tuple_inputs, dict_inputs) | |
tuple_inputs = self._prepare_for_class(inputs_dict, model_class) | |
dict_inputs = self._prepare_for_class(inputs_dict, model_class) | |
check_equivalence(model, tuple_inputs, dict_inputs, {"output_hidden_states": True}) | |
tuple_inputs = self._prepare_for_class(inputs_dict, model_class) | |
dict_inputs = self._prepare_for_class(inputs_dict, model_class) | |
check_equivalence(model, tuple_inputs, dict_inputs, {"output_attentions": True}) | |
if model_class.__name__ not in ["PerceiverForOpticalFlow", "PerceiverForMultimodalAutoencoding"]: | |
# optical flow + multimodal models don't support training for now | |
tuple_inputs = self._prepare_for_class(inputs_dict, model_class, return_labels=True) | |
dict_inputs = self._prepare_for_class(inputs_dict, model_class, return_labels=True) | |
check_equivalence(model, tuple_inputs, dict_inputs, {"output_hidden_states": True}) | |
if model_class.__name__ not in ["PerceiverForOpticalFlow", "PerceiverForMultimodalAutoencoding"]: | |
# optical flow + multimodal models don't support training for now | |
tuple_inputs = self._prepare_for_class(inputs_dict, model_class, return_labels=True) | |
dict_inputs = self._prepare_for_class(inputs_dict, model_class, return_labels=True) | |
check_equivalence(model, tuple_inputs, dict_inputs, {"output_attentions": True}) | |
if model_class.__name__ not in ["PerceiverForOpticalFlow", "PerceiverForMultimodalAutoencoding"]: | |
# optical flow + multimodal models don't support training for now | |
tuple_inputs = self._prepare_for_class(inputs_dict, model_class, return_labels=True) | |
dict_inputs = self._prepare_for_class(inputs_dict, model_class, return_labels=True) | |
check_equivalence( | |
model, tuple_inputs, dict_inputs, {"output_hidden_states": True, "output_attentions": True} | |
) | |
def test_retain_grad_hidden_states_attentions(self): | |
# no need to test all models as different heads yield the same functionality | |
model_class = PerceiverForMaskedLM | |
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_model_class(model_class) | |
config.output_hidden_states = True | |
config.output_attentions = True | |
model = model_class(config) | |
model.to(torch_device) | |
inputs = self._prepare_for_class(inputs_dict, model_class) | |
outputs = model(**inputs) | |
output = outputs[0] | |
# Encoder-only model | |
hidden_states = outputs.hidden_states[0] | |
attentions = outputs.attentions[0] | |
hidden_states.retain_grad() | |
attentions.retain_grad() | |
output.flatten()[0].backward(retain_graph=True) | |
self.assertIsNotNone(hidden_states.grad) | |
self.assertIsNotNone(attentions.grad) | |
def test_feed_forward_chunking(self): | |
for model_class in self.all_model_classes: | |
original_config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_model_class(model_class) | |
torch.manual_seed(0) | |
config = copy.deepcopy(original_config) | |
model = model_class(config) | |
model.to(torch_device) | |
model.eval() | |
hidden_states_no_chunk = model(**self._prepare_for_class(inputs_dict, model_class))[0] | |
torch.manual_seed(0) | |
config.chunk_size_feed_forward = 1 | |
model = model_class(config) | |
model.to(torch_device) | |
model.eval() | |
hidden_states_with_chunk = model(**self._prepare_for_class(inputs_dict, model_class))[0] | |
if model_class.__name__ == "PerceiverForMultimodalAutoencoding": | |
# model outputs a dictionary with logits for each modality | |
for modality in hidden_states_no_chunk.keys(): | |
self.assertTrue( | |
torch.allclose(hidden_states_no_chunk[modality], hidden_states_with_chunk[modality], atol=1e-3) | |
) | |
else: | |
self.assertTrue(torch.allclose(hidden_states_no_chunk, hidden_states_with_chunk, atol=1e-3)) | |
def test_save_load(self): | |
for model_class in self.all_model_classes: | |
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_model_class(model_class) | |
model = model_class(config) | |
model.to(torch_device) | |
model.eval() | |
with torch.no_grad(): | |
outputs = model(**self._prepare_for_class(inputs_dict, model_class)) | |
if model_class.__name__ == "PerceiverForMultimodalAutoencoding": | |
for modality in outputs[0].keys(): | |
out_2 = outputs[0][modality].cpu().numpy() | |
out_2[np.isnan(out_2)] = 0 | |
with tempfile.TemporaryDirectory() as tmpdirname: | |
model.save_pretrained(tmpdirname) | |
model = model_class.from_pretrained(tmpdirname) | |
model.to(torch_device) | |
with torch.no_grad(): | |
after_outputs = model(**self._prepare_for_class(inputs_dict, model_class)) | |
# Make sure we don't have nans | |
out_1 = after_outputs[0][modality].cpu().numpy() | |
out_1[np.isnan(out_1)] = 0 | |
max_diff = np.amax(np.abs(out_1 - out_2)) | |
self.assertLessEqual(max_diff, 1e-5) | |
else: | |
out_2 = outputs[0].cpu().numpy() | |
out_2[np.isnan(out_2)] = 0 | |
with tempfile.TemporaryDirectory() as tmpdirname: | |
model.save_pretrained(tmpdirname) | |
model = model_class.from_pretrained(tmpdirname) | |
model.to(torch_device) | |
with torch.no_grad(): | |
after_outputs = model(**self._prepare_for_class(inputs_dict, model_class)) | |
# Make sure we don't have nans | |
out_1 = after_outputs[0].cpu().numpy() | |
out_1[np.isnan(out_1)] = 0 | |
max_diff = np.amax(np.abs(out_1 - out_2)) | |
self.assertLessEqual(max_diff, 1e-5) | |
def test_correct_missing_keys(self): | |
if not self.test_missing_keys: | |
return | |
config, _ = self.model_tester.prepare_config_and_inputs_for_common() | |
for model_class in self.all_model_classes: | |
# most Perceiver models don't have a typical head like is the case with BERT | |
if model_class in [ | |
PerceiverForOpticalFlow, | |
PerceiverForMultimodalAutoencoding, | |
*get_values(MODEL_FOR_SEQUENCE_CLASSIFICATION_MAPPING), | |
*get_values(MODEL_FOR_IMAGE_CLASSIFICATION_MAPPING), | |
]: | |
continue | |
model = model_class(config) | |
base_model_prefix = model.base_model_prefix | |
if hasattr(model, base_model_prefix): | |
with tempfile.TemporaryDirectory() as temp_dir_name: | |
model.base_model.save_pretrained(temp_dir_name) | |
model, loading_info = model_class.from_pretrained(temp_dir_name, output_loading_info=True) | |
with self.subTest(msg=f"Missing keys for {model.__class__.__name__}"): | |
self.assertGreater(len(loading_info["missing_keys"]), 0) | |
def test_problem_types(self): | |
problem_types = [ | |
{"title": "multi_label_classification", "num_labels": 2, "dtype": torch.float}, | |
{"title": "single_label_classification", "num_labels": 1, "dtype": torch.long}, | |
{"title": "regression", "num_labels": 1, "dtype": torch.float}, | |
] | |
for model_class in self.all_model_classes: | |
if model_class not in get_values(MODEL_FOR_SEQUENCE_CLASSIFICATION_MAPPING): | |
continue | |
config, inputs, input_mask, _, _ = self.model_tester.prepare_config_and_inputs(model_class=model_class) | |
inputs_dict = {"inputs": inputs, "attention_mask": input_mask} | |
for problem_type in problem_types: | |
with self.subTest(msg=f"Testing {model_class} with {problem_type['title']}"): | |
config.problem_type = problem_type["title"] | |
config.num_labels = problem_type["num_labels"] | |
model = model_class(config) | |
model.to(torch_device) | |
model.train() | |
inputs = self._prepare_for_class(inputs_dict, model_class, return_labels=True) | |
if problem_type["num_labels"] > 1: | |
inputs["labels"] = inputs["labels"].unsqueeze(1).repeat(1, problem_type["num_labels"]) | |
inputs["labels"] = inputs["labels"].to(problem_type["dtype"]) | |
# This tests that we do not trigger the warning form PyTorch "Using a target size that is different | |
# to the input size. This will likely lead to incorrect results due to broadcasting. Please ensure | |
# they have the same size." which is a symptom something in wrong for the regression problem. | |
# See https://github.com/huggingface/transformers/issues/11780 | |
with warnings.catch_warnings(record=True) as warning_list: | |
loss = model(**inputs).loss | |
for w in warning_list: | |
if "Using a target size that is different to the input size" in str(w.message): | |
raise ValueError( | |
f"Something is going wrong in the regression problem: intercepted {w.message}" | |
) | |
loss.backward() | |
def test_multi_gpu_data_parallel_forward(self): | |
pass | |
def test_save_load_fast_init_from_base(self): | |
pass | |
def test_save_load_fast_init_to_base(self): | |
pass | |
def test_resize_tokens_embeddings(self): | |
pass | |
def test_resize_embeddings_untied(self): | |
pass | |
def test_inputs_embeds(self): | |
pass | |
def test_load_with_mismatched_shapes(self): | |
pass | |
def test_model_from_pretrained(self): | |
for model_name in PERCEIVER_PRETRAINED_MODEL_ARCHIVE_LIST[:1]: | |
model = PerceiverModel.from_pretrained(model_name) | |
self.assertIsNotNone(model) | |
# We will verify our results on an image of cute cats | |
def prepare_img(): | |
image = Image.open("./tests/fixtures/tests_samples/COCO/000000039769.png") | |
return image | |
# Helper functions for optical flow integration test | |
def prepare_optical_flow_images(): | |
dataset = load_dataset("hf-internal-testing/fixtures_sintel", split="test") | |
image1 = Image.open(dataset[0]["file"]).convert("RGB") | |
image2 = Image.open(dataset[0]["file"]).convert("RGB") | |
return image1, image2 | |
def normalize(img): | |
return img / 255.0 * 2 - 1 | |
def extract_image_patches(x, kernel, stride=1, dilation=1): | |
# Do TF 'SAME' Padding | |
b, c, h, w = x.shape | |
h2 = math.ceil(h / stride) | |
w2 = math.ceil(w / stride) | |
pad_row = (h2 - 1) * stride + (kernel - 1) * dilation + 1 - h | |
pad_col = (w2 - 1) * stride + (kernel - 1) * dilation + 1 - w | |
x = torch.nn.functional.pad(x, (pad_row // 2, pad_row - pad_row // 2, pad_col // 2, pad_col - pad_col // 2)) | |
# Extract patches | |
patches = x.unfold(2, kernel, stride).unfold(3, kernel, stride) | |
patches = patches.permute(0, 4, 5, 1, 2, 3).contiguous() | |
return patches.view(b, -1, patches.shape[-2], patches.shape[-1]) | |
class PerceiverModelIntegrationTest(unittest.TestCase): | |
def test_inference_masked_lm(self): | |
tokenizer = PerceiverTokenizer.from_pretrained("deepmind/language-perceiver") | |
model = PerceiverForMaskedLM.from_pretrained("deepmind/language-perceiver") | |
model.to(torch_device) | |
# prepare inputs | |
text = "This is an incomplete sentence where some words are missing." | |
encoding = tokenizer(text, padding="max_length", return_tensors="pt") | |
# mask " missing.". | |
encoding.input_ids[0, 52:61] = tokenizer.mask_token_id | |
inputs, input_mask = encoding.input_ids.to(torch_device), encoding.attention_mask.to(torch_device) | |
# forward pass | |
with torch.no_grad(): | |
outputs = model(inputs=inputs, attention_mask=input_mask) | |
logits = outputs.logits | |
# verify logits | |
expected_shape = torch.Size((1, tokenizer.model_max_length, tokenizer.vocab_size)) | |
self.assertEqual(logits.shape, expected_shape) | |
expected_slice = torch.tensor( | |
[[-10.8609, -10.7651, -10.9187], [-12.1689, -11.9389, -12.1479], [-12.1518, -11.9707, -12.2073]], | |
device=torch_device, | |
) | |
self.assertTrue(torch.allclose(logits[0, :3, :3], expected_slice, atol=1e-4)) | |
expected_greedy_predictions = [38, 115, 111, 121, 121, 111, 116, 109, 52] | |
masked_tokens_predictions = logits[0, 52:61].argmax(dim=-1).tolist() | |
self.assertListEqual(expected_greedy_predictions, masked_tokens_predictions) | |
def test_inference_image_classification(self): | |
feature_extractor = PerceiverFeatureExtractor() | |
model = PerceiverForImageClassificationLearned.from_pretrained("deepmind/vision-perceiver-learned") | |
model.to(torch_device) | |
# prepare inputs | |
image = prepare_img() | |
inputs = feature_extractor(image, return_tensors="pt").pixel_values.to(torch_device) | |
input_mask = None | |
# forward pass | |
with torch.no_grad(): | |
outputs = model(inputs=inputs, attention_mask=input_mask) | |
logits = outputs.logits | |
# verify logits | |
expected_shape = torch.Size((1, model.config.num_labels)) | |
self.assertEqual(logits.shape, expected_shape) | |
expected_slice = torch.tensor([-1.1652, -0.1992, -0.7520], device=torch_device) | |
self.assertTrue(torch.allclose(logits[0, :3], expected_slice, atol=1e-4)) | |
def test_inference_image_classification_fourier(self): | |
feature_extractor = PerceiverFeatureExtractor() | |
model = PerceiverForImageClassificationFourier.from_pretrained("deepmind/vision-perceiver-fourier") | |
model.to(torch_device) | |
# prepare inputs | |
image = prepare_img() | |
inputs = feature_extractor(image, return_tensors="pt").pixel_values.to(torch_device) | |
input_mask = None | |
# forward pass | |
with torch.no_grad(): | |
outputs = model(inputs=inputs, attention_mask=input_mask) | |
logits = outputs.logits | |
# verify logits | |
expected_shape = torch.Size((1, model.config.num_labels)) | |
self.assertEqual(logits.shape, expected_shape) | |
expected_slice = torch.tensor([-1.1295, -0.2832, 0.3226], device=torch_device) | |
self.assertTrue(torch.allclose(logits[0, :3], expected_slice, atol=1e-4)) | |
def test_inference_image_classification_conv(self): | |
feature_extractor = PerceiverFeatureExtractor() | |
model = PerceiverForImageClassificationConvProcessing.from_pretrained("deepmind/vision-perceiver-conv") | |
model.to(torch_device) | |
# prepare inputs | |
image = prepare_img() | |
inputs = feature_extractor(image, return_tensors="pt").pixel_values.to(torch_device) | |
input_mask = None | |
# forward pass | |
with torch.no_grad(): | |
outputs = model(inputs=inputs, attention_mask=input_mask) | |
logits = outputs.logits | |
# verify logits | |
expected_shape = torch.Size((1, model.config.num_labels)) | |
self.assertEqual(logits.shape, expected_shape) | |
expected_slice = torch.tensor([-1.1186, 0.0554, 0.0897], device=torch_device) | |
self.assertTrue(torch.allclose(logits[0, :3], expected_slice, atol=1e-4)) | |
def test_inference_optical_flow(self): | |
model = PerceiverForOpticalFlow.from_pretrained("deepmind/optical-flow-perceiver") | |
model.to(torch_device) | |
# prepare inputs | |
image1, image2 = prepare_optical_flow_images() | |
img1 = normalize(np.array(image1)) | |
img2 = normalize(np.array(image1)) | |
# stack images | |
img1 = torch.tensor(np.moveaxis(img1, -1, 0)) | |
img2 = torch.tensor(np.moveaxis(img2, -1, 0)) | |
images = torch.stack([img1, img2], dim=0) | |
# extract 3x3 patches | |
patch_size = model.config.train_size | |
inputs = images[..., : patch_size[0], : patch_size[1]].unsqueeze(0) | |
batch_size, _, C, H, W = inputs.shape | |
patches = extract_image_patches(inputs.view(batch_size * 2, C, H, W), kernel=3) | |
_, C, H, W = patches.shape | |
patches = patches.view(batch_size, -1, C, H, W).float() | |
# forward pass | |
with torch.no_grad(): | |
outputs = model(inputs=patches.to(torch_device)) | |
logits = outputs.logits | |
# verify logits | |
expected_shape = torch.Size((1, 368, 496, 2)) | |
self.assertEqual(logits.shape, expected_shape) | |
expected_slice = torch.tensor( | |
[ | |
[[0.0025, -0.0050], [0.0025, -0.0049], [0.0025, -0.0048]], | |
[[0.0026, -0.0049], [0.0026, -0.0048], [0.0026, -0.0047]], | |
[[0.0026, -0.0049], [0.0026, -0.0048], [0.0026, -0.0046]], | |
], | |
device=torch_device, | |
) | |
self.assertTrue(torch.allclose(logits[0, :3, :3, :3], expected_slice, atol=1e-4)) | |