# Copyright (c) OpenMMLab. All rights reserved. # Copyright 2018 The Google AI Language Team Authors and The HuggingFace Inc. # Copyright (c) 2018, NVIDIA CORPORATION. All rights reserved. # flake8: noqa import math import torch from torch import nn from torch.utils.checkpoint import checkpoint try: from transformers.models.bert.configuration_bert import BertConfig except: BertConfig = None from mmpretrain.registry import MODELS from ..blip.language_model import BertAttention, BertIntermediate, BertOutput def gelu(x): """Original Implementation of the gelu activation function in Google Bert repo when initially created. For information: OpenAI GPT's gelu is slightly different (and gives slightly different results): 0.5 * x * (1 + torch.tanh(math.sqrt(2 / math.pi) * (x + 0.044715 * torch.pow(x, 3)))) Also see https://arxiv.org/abs/1606.08415 """ # noqa return x * 0.5 * (1.0 + torch.erf(x / math.sqrt(2.0))) def gelu_new(x): """Implementation of the gelu activation function currently in Google Bert repo (identical to OpenAI GPT) https://arxiv.org/abs/1606.08415.""" return 0.5 * x * (1 + torch.tanh( math.sqrt(2 / math.pi) * (x + 0.044715 * torch.pow(x, 3)))) def swish(x): return x * torch.sigmoid(x) ACT2FN = { 'gelu': gelu, 'relu': torch.nn.functional.relu, 'swish': swish, 'gelu_new': gelu_new } class BertEmbeddings(nn.Module): """Construct the embeddings from word, position and token_type embeddings.""" def __init__(self, config): super(BertEmbeddings, self).__init__() self.word_embeddings = nn.Embedding( config.vocab_size, config.hidden_size, padding_idx=0) self.position_embeddings = nn.Embedding(config.max_position_embeddings, config.hidden_size) self.token_type_embeddings = nn.Embedding(config.type_vocab_size, config.hidden_size) # self.LayerNorm is not snake-cased to stick with TensorFlow model # variable name and be able to load any TensorFlow checkpoint file self.LayerNorm = nn.LayerNorm( config.hidden_size, eps=config.layer_norm_eps) self.dropout = nn.Dropout(config.hidden_dropout_prob) def forward(self, input_ids, token_type_ids=None, position_ids=None): seq_length = input_ids.size(1) if position_ids is None: position_ids = torch.arange( seq_length, dtype=torch.long, device=input_ids.device) position_ids = position_ids.unsqueeze(0).expand_as(input_ids) if token_type_ids is None: token_type_ids = torch.zeros_like(input_ids) words_embeddings = self.word_embeddings(input_ids) position_embeddings = self.position_embeddings(position_ids) token_type_embeddings = self.token_type_embeddings(token_type_ids) embeddings = words_embeddings + position_embeddings \ + token_type_embeddings embeddings = self.LayerNorm(embeddings) embeddings = self.dropout(embeddings) return embeddings class BertLayer(nn.Module): def __init__(self, config): super(BertLayer, self).__init__() self.attention = BertAttention(config) self.intermediate = BertIntermediate(config) self.output = BertOutput(config) def forward(self, hidden_states, attention_mask=None, head_mask=None): attention_outputs = self.attention(hidden_states, attention_mask, head_mask) attention_output = attention_outputs[0] intermediate_output = self.intermediate(attention_output) layer_output = self.output(intermediate_output, attention_output) outputs = (layer_output, ) + attention_outputs[ 1:] # add attentions if we output them if len(outputs) == 1: return outputs[0] return outputs class BertEncoder(nn.Module): def __init__(self, config): super(BertEncoder, self).__init__() self.output_attentions = config.output_attentions self.output_hidden_states = config.output_hidden_states self.grad_checkpointing = False self.layer = nn.ModuleList( [BertLayer(config) for _ in range(config.num_hidden_layers)]) def forward(self, hidden_states, attention_mask=None, head_mask=None): all_hidden_states = () all_attentions = () for i, layer_module in enumerate(self.layer): if self.output_hidden_states: all_hidden_states = all_hidden_states + (hidden_states, ) if self.grad_checkpointing and not torch.jit.is_scripting(): layer_outputs = checkpoint(layer_module, hidden_states, attention_mask, head_mask[i]) else: layer_outputs = layer_module(hidden_states, attention_mask, head_mask[i]) if not isinstance(layer_outputs, tuple): layer_outputs = (layer_outputs, ) hidden_states = layer_outputs[0] if self.output_attentions: all_attentions = all_attentions + (layer_outputs[1], ) # Add last layer if self.output_hidden_states: all_hidden_states = all_hidden_states + (hidden_states, ) outputs = (hidden_states, ) if self.output_hidden_states: outputs = outputs + (all_hidden_states, ) if self.output_attentions: outputs = outputs + (all_attentions, ) # last-layer hidden state, (all hidden states), (all attentions) return outputs class BertPreTrainedModel(nn.Module): base_model_prefix = 'bert' def __init__(self, config): super(BertPreTrainedModel, self).__init__() self.config = config def _init_weights(self, module): """Initialize the weights.""" if isinstance(module, (nn.Linear, nn.Embedding)): # Slightly different from the TF version # which uses truncated_normal for initialization # cf https://github.com/pytorch/pytorch/pull/5617 module.weight.data.normal_( mean=0.0, std=self.config.initializer_range) elif isinstance(module, nn.LayerNorm): module.bias.data.zero_() module.weight.data.fill_(1.0) if isinstance(module, nn.Linear) and module.bias is not None: module.bias.data.zero_() @MODELS.register_module() class BertModelCN(BertPreTrainedModel): """The BERT model implementation for Chinese CLIP.""" def __init__(self, config): config = BertConfig.from_dict(config) super(BertModelCN, self).__init__(config) self.embeddings = BertEmbeddings(config) self.encoder = BertEncoder(config) self.apply(self._init_weights) @torch.jit.ignore def set_grad_checkpointing(self, enable=True): if enable: assert not self.config.output_attentions, \ 'Grad checkpointing is currently conflict with ' \ 'output_attentions for BertEncoder, ' \ 'please set it to False in BertConfig' self.encoder.grad_checkpointing = enable def forward(self, input_ids, attention_mask=None, token_type_ids=None, position_ids=None, head_mask=None): if attention_mask is None: attention_mask = torch.ones_like(input_ids) if token_type_ids is None: token_type_ids = torch.zeros_like(input_ids) # We create a 3D attention mask from a 2D tensor mask. # Sizes are [batch_size, 1, 1, to_seq_length] # So we can broadcast to [batch_size, num_heads, from_seq_length, to_seq_length] # this attention mask is more simple than the triangular masking of causal attention # used in OpenAI GPT, we just need to prepare the broadcast dimension here. extended_attention_mask = attention_mask.unsqueeze(1).unsqueeze(2) # Since attention_mask is 1.0 for positions we want to attend and 0.0 for # masked positions, this operation will create a tensor which is 0.0 for # positions we want to attend and -10000.0 for masked positions. # Since we are adding it to the raw scores before the softmax, this is # effectively the same as removing these entirely. extended_attention_mask = extended_attention_mask.to( dtype=next(self.parameters()).dtype) # fp16 compatibility extended_attention_mask = (1.0 - extended_attention_mask) * -10000.0 # Prepare head mask if needed # 1.0 in head_mask indicate we keep the head # attention_probs has shape bsz x n_heads x N x N # input head_mask has shape [num_heads] or [num_hidden_layers x num_heads] # and head_mask is converted to shape [num_hidden_layers x batch x num_heads x seq_length x seq_length] if head_mask is not None: if head_mask.dim() == 1: head_mask = head_mask.unsqueeze(0).unsqueeze(0).unsqueeze( -1).unsqueeze(-1) head_mask = head_mask.expand(self.config.num_hidden_layers, -1, -1, -1, -1) elif head_mask.dim() == 2: head_mask = head_mask.unsqueeze(1).unsqueeze(-1).unsqueeze( -1) # We can specify head_mask for each layer head_mask = head_mask.to(dtype=next(self.parameters( )).dtype) # switch to fload if need + fp16 compatibility else: head_mask = [None] * self.config.num_hidden_layers embedding_output = self.embeddings( input_ids, position_ids=position_ids, token_type_ids=token_type_ids) encoder_outputs = self.encoder( embedding_output, extended_attention_mask, head_mask=head_mask) sequence_output = encoder_outputs[0] # pooled_output = self.pooler(sequence_output) pooled_output = None # add hidden_states and attentions if they are here outputs = ( sequence_output, pooled_output, ) + encoder_outputs[1:] # sequence_output, pooled_output, (hidden_states), (attentions) return outputs