Spaces:
Sleeping
Sleeping
# coding=utf-8 | |
# Copyright 2019-present, the HuggingFace Inc. team, The Google AI Language Team and Facebook, Inc. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
""" | |
TF 2.0 DistilBERT model | |
""" | |
import warnings | |
import tensorflow as tf | |
from ...activations_tf import get_tf_activation | |
from ...file_utils import ( | |
MULTIPLE_CHOICE_DUMMY_INPUTS, | |
add_code_sample_docstrings, | |
add_start_docstrings, | |
add_start_docstrings_to_model_forward, | |
) | |
from ...modeling_tf_outputs import ( | |
TFBaseModelOutput, | |
TFMaskedLMOutput, | |
TFMultipleChoiceModelOutput, | |
TFQuestionAnsweringModelOutput, | |
TFSequenceClassifierOutput, | |
TFTokenClassifierOutput, | |
) | |
from ...modeling_tf_utils import ( | |
TFMaskedLanguageModelingLoss, | |
TFMultipleChoiceLoss, | |
TFPreTrainedModel, | |
TFQuestionAnsweringLoss, | |
TFSequenceClassificationLoss, | |
TFTokenClassificationLoss, | |
get_initializer, | |
input_processing, | |
keras_serializable, | |
shape_list, | |
) | |
from ...utils import logging | |
from .configuration_distilbert import DistilBertConfig | |
logger = logging.get_logger(__name__) | |
_CHECKPOINT_FOR_DOC = "distilbert-base-uncased" | |
_CONFIG_FOR_DOC = "DistilBertConfig" | |
_TOKENIZER_FOR_DOC = "DistilBertTokenizer" | |
TF_DISTILBERT_PRETRAINED_MODEL_ARCHIVE_LIST = [ | |
"distilbert-base-uncased", | |
"distilbert-base-uncased-distilled-squad", | |
"distilbert-base-cased", | |
"distilbert-base-cased-distilled-squad", | |
"distilbert-base-multilingual-cased", | |
"distilbert-base-uncased-finetuned-sst-2-english", | |
# See all DistilBERT models at https://huggingface.co/models?filter=distilbert | |
] | |
class TFEmbeddings(tf.keras.layers.Layer): | |
"""Construct the embeddings from word, position and token_type embeddings.""" | |
def __init__(self, config, **kwargs): | |
super().__init__(**kwargs) | |
self.vocab_size = config.vocab_size | |
self.dim = config.dim | |
self.initializer_range = config.initializer_range | |
self.max_position_embeddings = config.max_position_embeddings | |
self.embeddings_sum = tf.keras.layers.Add() | |
self.LayerNorm = tf.keras.layers.LayerNormalization(epsilon=1e-12, name="LayerNorm") | |
self.dropout = tf.keras.layers.Dropout(rate=config.dropout) | |
def build(self, input_shape: tf.TensorShape): | |
with tf.name_scope("word_embeddings"): | |
self.weight = self.add_weight( | |
name="weight", | |
shape=[self.vocab_size, self.dim], | |
initializer=get_initializer(initializer_range=self.initializer_range), | |
) | |
with tf.name_scope("position_embeddings"): | |
self.position_embeddings = self.add_weight( | |
name="embeddings", | |
shape=[self.max_position_embeddings, self.dim], | |
initializer=get_initializer(initializer_range=self.initializer_range), | |
) | |
super().build(input_shape) | |
def call(self, input_ids=None, position_ids=None, inputs_embeds=None, training=False): | |
""" | |
Applies embedding based on inputs tensor. | |
Returns: | |
final_embeddings (:obj:`tf.Tensor`): output embedding tensor. | |
""" | |
assert not (input_ids is None and inputs_embeds is None) | |
if input_ids is not None: | |
inputs_embeds = tf.gather(params=self.weight, indices=input_ids) | |
input_shape = shape_list(inputs_embeds)[:-1] | |
if position_ids is None: | |
position_ids = tf.expand_dims(tf.range(start=0, limit=input_shape[-1]), axis=0) | |
position_embeds = tf.gather(params=self.position_embeddings, indices=position_ids) | |
position_embeds = tf.tile(input=position_embeds, multiples=(input_shape[0], 1, 1)) | |
final_embeddings = self.embeddings_sum(inputs=[inputs_embeds, position_embeds]) | |
final_embeddings = self.LayerNorm(inputs=final_embeddings) | |
final_embeddings = self.dropout(inputs=final_embeddings, training=training) | |
return final_embeddings | |
class TFMultiHeadSelfAttention(tf.keras.layers.Layer): | |
def __init__(self, config, **kwargs): | |
super().__init__(**kwargs) | |
self.n_heads = config.n_heads | |
self.dim = config.dim | |
self.dropout = tf.keras.layers.Dropout(config.attention_dropout) | |
self.output_attentions = config.output_attentions | |
assert self.dim % self.n_heads == 0, f"Hidden size {self.dim} not dividable by number of heads {self.n_heads}" | |
self.q_lin = tf.keras.layers.Dense( | |
config.dim, kernel_initializer=get_initializer(config.initializer_range), name="q_lin" | |
) | |
self.k_lin = tf.keras.layers.Dense( | |
config.dim, kernel_initializer=get_initializer(config.initializer_range), name="k_lin" | |
) | |
self.v_lin = tf.keras.layers.Dense( | |
config.dim, kernel_initializer=get_initializer(config.initializer_range), name="v_lin" | |
) | |
self.out_lin = tf.keras.layers.Dense( | |
config.dim, kernel_initializer=get_initializer(config.initializer_range), name="out_lin" | |
) | |
self.pruned_heads = set() | |
def prune_heads(self, heads): | |
raise NotImplementedError | |
def call(self, query, key, value, mask, head_mask, output_attentions, training=False): | |
""" | |
Parameters: | |
query: tf.Tensor(bs, seq_length, dim) | |
key: tf.Tensor(bs, seq_length, dim) | |
value: tf.Tensor(bs, seq_length, dim) | |
mask: tf.Tensor(bs, seq_length) | |
Returns: | |
weights: tf.Tensor(bs, n_heads, seq_length, seq_length) Attention weights context: tf.Tensor(bs, | |
seq_length, dim) Contextualized layer. Optional: only if `output_attentions=True` | |
""" | |
bs, q_length, dim = shape_list(query) | |
k_length = shape_list(key)[1] | |
# assert dim == self.dim, f'Dimensions do not match: {dim} input vs {self.dim} configured' | |
# assert key.size() == value.size() | |
dim_per_head = tf.math.divide(self.dim, self.n_heads) | |
dim_per_head = tf.cast(dim_per_head, dtype=tf.int32) | |
mask_reshape = [bs, 1, 1, k_length] | |
def shape(x): | |
"""separate heads""" | |
return tf.transpose(tf.reshape(x, (bs, -1, self.n_heads, dim_per_head)), perm=(0, 2, 1, 3)) | |
def unshape(x): | |
"""group heads""" | |
return tf.reshape(tf.transpose(x, perm=(0, 2, 1, 3)), (bs, -1, self.n_heads * dim_per_head)) | |
q = shape(self.q_lin(query)) # (bs, n_heads, q_length, dim_per_head) | |
k = shape(self.k_lin(key)) # (bs, n_heads, k_length, dim_per_head) | |
v = shape(self.v_lin(value)) # (bs, n_heads, k_length, dim_per_head) | |
q = tf.cast(q, dtype=tf.float32) | |
q = tf.multiply(q, tf.math.rsqrt(tf.cast(dim_per_head, dtype=tf.float32))) | |
k = tf.cast(k, dtype=q.dtype) | |
scores = tf.matmul(q, k, transpose_b=True) # (bs, n_heads, q_length, k_length) | |
mask = tf.reshape(mask, mask_reshape) # (bs, n_heads, qlen, klen) | |
# scores.masked_fill_(mask, -float('inf')) # (bs, n_heads, q_length, k_length) | |
mask = tf.cast(mask, dtype=scores.dtype) | |
scores = scores - 1e30 * (1.0 - mask) | |
weights = tf.nn.softmax(scores, axis=-1) # (bs, n_heads, qlen, klen) | |
weights = self.dropout(weights, training=training) # (bs, n_heads, qlen, klen) | |
# Mask heads if we want to | |
if head_mask is not None: | |
weights = weights * head_mask | |
context = tf.matmul(weights, v) # (bs, n_heads, qlen, dim_per_head) | |
context = unshape(context) # (bs, q_length, dim) | |
context = self.out_lin(context) # (bs, q_length, dim) | |
if output_attentions: | |
return (context, weights) | |
else: | |
return (context,) | |
class TFFFN(tf.keras.layers.Layer): | |
def __init__(self, config, **kwargs): | |
super().__init__(**kwargs) | |
self.dropout = tf.keras.layers.Dropout(config.dropout) | |
self.lin1 = tf.keras.layers.Dense( | |
config.hidden_dim, kernel_initializer=get_initializer(config.initializer_range), name="lin1" | |
) | |
self.lin2 = tf.keras.layers.Dense( | |
config.dim, kernel_initializer=get_initializer(config.initializer_range), name="lin2" | |
) | |
assert config.activation in ["relu", "gelu"], f"activation ({config.activation}) must be in ['relu', 'gelu']" | |
self.activation = get_tf_activation(config.activation) | |
def call(self, input, training=False): | |
x = self.lin1(input) | |
x = self.activation(x) | |
x = self.lin2(x) | |
x = self.dropout(x, training=training) | |
return x | |
class TFTransformerBlock(tf.keras.layers.Layer): | |
def __init__(self, config, **kwargs): | |
super().__init__(**kwargs) | |
self.n_heads = config.n_heads | |
self.dim = config.dim | |
self.hidden_dim = config.hidden_dim | |
self.dropout = tf.keras.layers.Dropout(config.dropout) | |
self.activation = config.activation | |
self.output_attentions = config.output_attentions | |
assert ( | |
config.dim % config.n_heads == 0 | |
), f"Hidden size {config.dim} not dividable by number of heads {config.n_heads}" | |
self.attention = TFMultiHeadSelfAttention(config, name="attention") | |
self.sa_layer_norm = tf.keras.layers.LayerNormalization(epsilon=1e-12, name="sa_layer_norm") | |
self.ffn = TFFFN(config, name="ffn") | |
self.output_layer_norm = tf.keras.layers.LayerNormalization(epsilon=1e-12, name="output_layer_norm") | |
def call(self, x, attn_mask, head_mask, output_attentions, training=False): # removed: src_enc=None, src_len=None | |
""" | |
Parameters: | |
x: tf.Tensor(bs, seq_length, dim) | |
attn_mask: tf.Tensor(bs, seq_length) | |
Outputs: sa_weights: tf.Tensor(bs, n_heads, seq_length, seq_length) The attention weights ffn_output: | |
tf.Tensor(bs, seq_length, dim) The output of the transformer block contextualization. | |
""" | |
# Self-Attention | |
sa_output = self.attention(x, x, x, attn_mask, head_mask, output_attentions, training=training) | |
if output_attentions: | |
sa_output, sa_weights = sa_output # (bs, seq_length, dim), (bs, n_heads, seq_length, seq_length) | |
else: # To handle these `output_attentions` or `output_hidden_states` cases returning tuples | |
# assert type(sa_output) == tuple | |
sa_output = sa_output[0] | |
sa_output = self.sa_layer_norm(sa_output + x) # (bs, seq_length, dim) | |
# Feed Forward Network | |
ffn_output = self.ffn(sa_output, training=training) # (bs, seq_length, dim) | |
ffn_output = self.output_layer_norm(ffn_output + sa_output) # (bs, seq_length, dim) | |
output = (ffn_output,) | |
if output_attentions: | |
output = (sa_weights,) + output | |
return output | |
class TFTransformer(tf.keras.layers.Layer): | |
def __init__(self, config, **kwargs): | |
super().__init__(**kwargs) | |
self.n_layers = config.n_layers | |
self.output_hidden_states = config.output_hidden_states | |
self.output_attentions = config.output_attentions | |
self.layer = [TFTransformerBlock(config, name=f"layer_._{i}") for i in range(config.n_layers)] | |
def call(self, x, attn_mask, head_mask, output_attentions, output_hidden_states, return_dict, training=False): | |
# docstyle-ignore | |
""" | |
Parameters: | |
x: tf.Tensor(bs, seq_length, dim) Input sequence embedded. | |
attn_mask: tf.Tensor(bs, seq_length) Attention mask on the sequence. | |
Returns: | |
hidden_state: tf.Tensor(bs, seq_length, dim) | |
Sequence of hidden states in the last (top) layer | |
all_hidden_states: Tuple[tf.Tensor(bs, seq_length, dim)] | |
Tuple of length n_layers with the hidden states from each layer. | |
Optional: only if output_hidden_states=True | |
all_attentions: Tuple[tf.Tensor(bs, n_heads, seq_length, seq_length)] | |
Tuple of length n_layers with the attention weights from each layer | |
Optional: only if output_attentions=True | |
""" | |
all_hidden_states = () if output_hidden_states else None | |
all_attentions = () if output_attentions else None | |
hidden_state = x | |
for i, layer_module in enumerate(self.layer): | |
if output_hidden_states: | |
all_hidden_states = all_hidden_states + (hidden_state,) | |
layer_outputs = layer_module(hidden_state, attn_mask, head_mask[i], output_attentions, training=training) | |
hidden_state = layer_outputs[-1] | |
if output_attentions: | |
assert len(layer_outputs) == 2 | |
attentions = layer_outputs[0] | |
all_attentions = all_attentions + (attentions,) | |
else: | |
assert len(layer_outputs) == 1, f"Incorrect number of outputs {len(layer_outputs)} instead of 1" | |
# Add last layer | |
if output_hidden_states: | |
all_hidden_states = all_hidden_states + (hidden_state,) | |
if not return_dict: | |
return tuple(v for v in [hidden_state, all_hidden_states, all_attentions] if v is not None) | |
return TFBaseModelOutput( | |
last_hidden_state=hidden_state, hidden_states=all_hidden_states, attentions=all_attentions | |
) | |
class TFDistilBertMainLayer(tf.keras.layers.Layer): | |
config_class = DistilBertConfig | |
def __init__(self, config, **kwargs): | |
super().__init__(**kwargs) | |
self.config = config | |
self.num_hidden_layers = config.num_hidden_layers | |
self.output_attentions = config.output_attentions | |
self.output_hidden_states = config.output_hidden_states | |
self.return_dict = config.use_return_dict | |
self.embeddings = TFEmbeddings(config, name="embeddings") # Embeddings | |
self.transformer = TFTransformer(config, name="transformer") # Encoder | |
def get_input_embeddings(self): | |
return self.embeddings | |
def set_input_embeddings(self, value): | |
self.embeddings.weight = value | |
self.embeddings.vocab_size = value.shape[0] | |
def _prune_heads(self, heads_to_prune): | |
raise NotImplementedError | |
def call( | |
self, | |
input_ids=None, | |
attention_mask=None, | |
head_mask=None, | |
inputs_embeds=None, | |
output_attentions=None, | |
output_hidden_states=None, | |
return_dict=None, | |
training=False, | |
**kwargs, | |
): | |
inputs = input_processing( | |
func=self.call, | |
config=self.config, | |
input_ids=input_ids, | |
attention_mask=attention_mask, | |
head_mask=head_mask, | |
inputs_embeds=inputs_embeds, | |
output_attentions=output_attentions, | |
output_hidden_states=output_hidden_states, | |
return_dict=return_dict, | |
training=training, | |
kwargs_call=kwargs, | |
) | |
if inputs["input_ids"] is not None and inputs["inputs_embeds"] is not None: | |
raise ValueError("You cannot specify both input_ids and inputs_embeds at the same time") | |
elif inputs["input_ids"] is not None: | |
input_shape = shape_list(inputs["input_ids"]) | |
elif inputs["inputs_embeds"] is not None: | |
input_shape = shape_list(inputs["inputs_embeds"])[:-1] | |
else: | |
raise ValueError("You have to specify either input_ids or inputs_embeds") | |
if inputs["attention_mask"] is None: | |
inputs["attention_mask"] = tf.ones(input_shape) # (bs, seq_length) | |
inputs["attention_mask"] = tf.cast(inputs["attention_mask"], dtype=tf.float32) | |
# Prepare head mask if needed | |
# 1.0 in head_mask indicate we keep the head | |
# attention_probs has shape bsz x n_heads x N x N | |
# input head_mask has shape [num_heads] or [num_hidden_layers x num_heads] | |
# and head_mask is converted to shape [num_hidden_layers x batch x num_heads x seq_length x seq_length] | |
if inputs["head_mask"] is not None: | |
raise NotImplementedError | |
else: | |
inputs["head_mask"] = [None] * self.num_hidden_layers | |
embedding_output = self.embeddings( | |
inputs["input_ids"], inputs_embeds=inputs["inputs_embeds"] | |
) # (bs, seq_length, dim) | |
tfmr_output = self.transformer( | |
embedding_output, | |
inputs["attention_mask"], | |
inputs["head_mask"], | |
inputs["output_attentions"], | |
inputs["output_hidden_states"], | |
inputs["return_dict"], | |
training=inputs["training"], | |
) | |
return tfmr_output # last-layer hidden-state, (all hidden_states), (all attentions) | |
# INTERFACE FOR ENCODER AND TASK SPECIFIC MODEL # | |
class TFDistilBertPreTrainedModel(TFPreTrainedModel): | |
""" | |
An abstract class to handle weights initialization and a simple interface for downloading and loading pretrained | |
models. | |
""" | |
config_class = DistilBertConfig | |
base_model_prefix = "distilbert" | |
def serving(self, inputs): | |
output = self.call(inputs) | |
return self.serving_output(output) | |
DISTILBERT_START_DOCSTRING = r""" | |
This model inherits from :class:`~transformers.TFPreTrainedModel`. Check the superclass documentation for the | |
generic methods the library implements for all its model (such as downloading or saving, resizing the input | |
embeddings, pruning heads etc.) | |
This model is also a `tf.keras.Model <https://www.tensorflow.org/api_docs/python/tf/keras/Model>`__ subclass. Use | |
it as a regular TF 2.0 Keras Model and refer to the TF 2.0 documentation for all matter related to general usage | |
and behavior. | |
.. note:: | |
TF 2.0 models accepts two formats as inputs: | |
- having all inputs as keyword arguments (like PyTorch models), or | |
- having all inputs as a list, tuple or dict in the first positional arguments. | |
This second option is useful when using :meth:`tf.keras.Model.fit` method which currently requires having all | |
the tensors in the first argument of the model call function: :obj:`model(inputs)`. | |
If you choose this second option, there are three possibilities you can use to gather all the input Tensors in | |
the first positional argument : | |
- a single Tensor with :obj:`input_ids` only and nothing else: :obj:`model(inputs_ids)` | |
- a list of varying length with one or several input Tensors IN THE ORDER given in the docstring: | |
:obj:`model([input_ids, attention_mask])` | |
- a dictionary with one or several input Tensors associated to the input names given in the docstring: | |
:obj:`model({"input_ids": input_ids})` | |
Parameters: | |
config (:class:`~transformers.DistilBertConfig`): Model configuration class with all the parameters of the model. | |
Initializing with a config file does not load the weights associated with the model, only the | |
configuration. Check out the :meth:`~transformers.PreTrainedModel.from_pretrained` method to load the model | |
weights. | |
""" | |
DISTILBERT_INPUTS_DOCSTRING = r""" | |
Args: | |
input_ids (:obj:`Numpy array` or :obj:`tf.Tensor` of shape :obj:`({0})`): | |
Indices of input sequence tokens in the vocabulary. | |
Indices can be obtained using :class:`~transformers.DistilBertTokenizer`. See | |
:func:`transformers.PreTrainedTokenizer.__call__` and :func:`transformers.PreTrainedTokenizer.encode` for | |
details. | |
`What are input IDs? <../glossary.html#input-ids>`__ | |
attention_mask (:obj:`Numpy array` or :obj:`tf.Tensor` of shape :obj:`({0})`, `optional`): | |
Mask to avoid performing attention on padding token indices. Mask values selected in ``[0, 1]``: | |
- 1 for tokens that are **not masked**, | |
- 0 for tokens that are **masked**. | |
`What are attention masks? <../glossary.html#attention-mask>`__ | |
head_mask (:obj:`Numpy array` or :obj:`tf.Tensor` of shape :obj:`(num_heads,)` or :obj:`(num_layers, num_heads)`, `optional`): | |
Mask to nullify selected heads of the self-attention modules. Mask values selected in ``[0, 1]``: | |
- 1 indicates the head is **not masked**, | |
- 0 indicates the head is **masked**. | |
inputs_embeds (:obj:`tf.Tensor` of shape :obj:`({0}, hidden_size)`, `optional`): | |
Optionally, instead of passing :obj:`input_ids` you can choose to directly pass an embedded representation. | |
This is useful if you want more control over how to convert :obj:`input_ids` indices into associated | |
vectors than the model's internal embedding lookup matrix. | |
output_attentions (:obj:`bool`, `optional`): | |
Whether or not to return the attentions tensors of all attention layers. See ``attentions`` under returned | |
tensors for more detail. This argument can be used only in eager mode, in graph mode the value in the | |
config will be used instead. | |
output_hidden_states (:obj:`bool`, `optional`): | |
Whether or not to return the hidden states of all layers. See ``hidden_states`` under returned tensors for | |
more detail. This argument can be used only in eager mode, in graph mode the value in the config will be | |
used instead. | |
return_dict (:obj:`bool`, `optional`): | |
Whether or not to return a :class:`~transformers.file_utils.ModelOutput` instead of a plain tuple. This | |
argument can be used in eager mode, in graph mode the value will always be set to True. | |
training (:obj:`bool`, `optional`, defaults to :obj:`False`): | |
Whether or not to use the model in training mode (some modules like dropout modules have different | |
behaviors between training and evaluation). | |
""" | |
class TFDistilBertModel(TFDistilBertPreTrainedModel): | |
def __init__(self, config, *inputs, **kwargs): | |
super().__init__(config, *inputs, **kwargs) | |
self.distilbert = TFDistilBertMainLayer(config, name="distilbert") # Embeddings | |
def call( | |
self, | |
input_ids=None, | |
attention_mask=None, | |
head_mask=None, | |
inputs_embeds=None, | |
output_attentions=None, | |
output_hidden_states=None, | |
return_dict=None, | |
training=False, | |
**kwargs, | |
): | |
inputs = input_processing( | |
func=self.call, | |
config=self.config, | |
input_ids=input_ids, | |
attention_mask=attention_mask, | |
head_mask=head_mask, | |
inputs_embeds=inputs_embeds, | |
output_attentions=output_attentions, | |
output_hidden_states=output_hidden_states, | |
return_dict=return_dict, | |
training=training, | |
kwargs_call=kwargs, | |
) | |
outputs = self.distilbert( | |
input_ids=inputs["input_ids"], | |
attention_mask=inputs["attention_mask"], | |
head_mask=inputs["head_mask"], | |
inputs_embeds=inputs["inputs_embeds"], | |
output_attentions=inputs["output_attentions"], | |
output_hidden_states=inputs["output_hidden_states"], | |
return_dict=inputs["return_dict"], | |
training=inputs["training"], | |
) | |
return outputs | |
def serving_output(self, output): | |
hs = tf.convert_to_tensor(output.hidden_states) if self.config.output_hidden_states else None | |
attns = tf.convert_to_tensor(output.attentions) if self.config.output_attentions else None | |
return TFBaseModelOutput(last_hidden_state=output.last_hidden_state, hidden_states=hs, attentions=attns) | |
class TFDistilBertLMHead(tf.keras.layers.Layer): | |
def __init__(self, config, input_embeddings, **kwargs): | |
super().__init__(**kwargs) | |
self.vocab_size = config.vocab_size | |
self.dim = config.dim | |
# The output weights are the same as the input embeddings, but there is | |
# an output-only bias for each token. | |
self.input_embeddings = input_embeddings | |
def build(self, input_shape): | |
self.bias = self.add_weight(shape=(self.vocab_size,), initializer="zeros", trainable=True, name="bias") | |
super().build(input_shape) | |
def get_output_embeddings(self): | |
return self.input_embeddings | |
def set_output_embeddings(self, value): | |
self.input_embeddings.weight = value | |
self.input_embeddings.vocab_size = shape_list(value)[0] | |
def get_bias(self): | |
return {"bias": self.bias} | |
def set_bias(self, value): | |
self.bias = value["bias"] | |
self.vocab_size = shape_list(value["bias"])[0] | |
def call(self, hidden_states): | |
seq_length = shape_list(tensor=hidden_states)[1] | |
hidden_states = tf.reshape(tensor=hidden_states, shape=[-1, self.dim]) | |
hidden_states = tf.matmul(a=hidden_states, b=self.input_embeddings.weight, transpose_b=True) | |
hidden_states = tf.reshape(tensor=hidden_states, shape=[-1, seq_length, self.vocab_size]) | |
hidden_states = tf.nn.bias_add(value=hidden_states, bias=self.bias) | |
return hidden_states | |
class TFDistilBertForMaskedLM(TFDistilBertPreTrainedModel, TFMaskedLanguageModelingLoss): | |
def __init__(self, config, *inputs, **kwargs): | |
super().__init__(config, *inputs, **kwargs) | |
self.vocab_size = config.vocab_size | |
self.distilbert = TFDistilBertMainLayer(config, name="distilbert") | |
self.vocab_transform = tf.keras.layers.Dense( | |
config.dim, kernel_initializer=get_initializer(config.initializer_range), name="vocab_transform" | |
) | |
self.act = get_tf_activation("gelu") | |
self.vocab_layer_norm = tf.keras.layers.LayerNormalization(epsilon=1e-12, name="vocab_layer_norm") | |
self.vocab_projector = TFDistilBertLMHead(config, self.distilbert.embeddings, name="vocab_projector") | |
def get_lm_head(self): | |
return self.vocab_projector | |
def get_prefix_bias_name(self): | |
warnings.warn("The method get_prefix_bias_name is deprecated. Please use `get_bias` instead.", FutureWarning) | |
return self.name + "/" + self.vocab_projector.name | |
def call( | |
self, | |
input_ids=None, | |
attention_mask=None, | |
head_mask=None, | |
inputs_embeds=None, | |
output_attentions=None, | |
output_hidden_states=None, | |
return_dict=None, | |
labels=None, | |
training=False, | |
**kwargs, | |
): | |
r""" | |
labels (:obj:`tf.Tensor` of shape :obj:`(batch_size, sequence_length)`, `optional`): | |
Labels for computing the masked language modeling loss. Indices should be in ``[-100, 0, ..., | |
config.vocab_size]`` (see ``input_ids`` docstring) Tokens with indices set to ``-100`` are ignored | |
(masked), the loss is only computed for the tokens with labels in ``[0, ..., config.vocab_size]`` | |
""" | |
inputs = input_processing( | |
func=self.call, | |
config=self.config, | |
input_ids=input_ids, | |
attention_mask=attention_mask, | |
head_mask=head_mask, | |
inputs_embeds=inputs_embeds, | |
output_attentions=output_attentions, | |
output_hidden_states=output_hidden_states, | |
return_dict=return_dict, | |
labels=labels, | |
training=training, | |
kwargs_call=kwargs, | |
) | |
distilbert_output = self.distilbert( | |
input_ids=inputs["input_ids"], | |
attention_mask=inputs["attention_mask"], | |
head_mask=inputs["head_mask"], | |
inputs_embeds=inputs["inputs_embeds"], | |
output_attentions=inputs["output_attentions"], | |
output_hidden_states=inputs["output_hidden_states"], | |
return_dict=inputs["return_dict"], | |
training=inputs["training"], | |
) | |
hidden_states = distilbert_output[0] # (bs, seq_length, dim) | |
prediction_logits = self.vocab_transform(hidden_states) # (bs, seq_length, dim) | |
prediction_logits = self.act(prediction_logits) # (bs, seq_length, dim) | |
prediction_logits = self.vocab_layer_norm(prediction_logits) # (bs, seq_length, dim) | |
prediction_logits = self.vocab_projector(prediction_logits) | |
loss = None if inputs["labels"] is None else self.compute_loss(inputs["labels"], prediction_logits) | |
if not inputs["return_dict"]: | |
output = (prediction_logits,) + distilbert_output[1:] | |
return ((loss,) + output) if loss is not None else output | |
return TFMaskedLMOutput( | |
loss=loss, | |
logits=prediction_logits, | |
hidden_states=distilbert_output.hidden_states, | |
attentions=distilbert_output.attentions, | |
) | |
# Copied from transformers.models.bert.modeling_tf_bert.TFBertForMaskedLM.serving_output | |
def serving_output(self, output: TFMaskedLMOutput) -> TFMaskedLMOutput: | |
hs = tf.convert_to_tensor(output.hidden_states) if self.config.output_hidden_states else None | |
attns = tf.convert_to_tensor(output.attentions) if self.config.output_attentions else None | |
return TFMaskedLMOutput(logits=output.logits, hidden_states=hs, attentions=attns) | |
class TFDistilBertForSequenceClassification(TFDistilBertPreTrainedModel, TFSequenceClassificationLoss): | |
def __init__(self, config, *inputs, **kwargs): | |
super().__init__(config, *inputs, **kwargs) | |
self.num_labels = config.num_labels | |
self.distilbert = TFDistilBertMainLayer(config, name="distilbert") | |
self.pre_classifier = tf.keras.layers.Dense( | |
config.dim, | |
kernel_initializer=get_initializer(config.initializer_range), | |
activation="relu", | |
name="pre_classifier", | |
) | |
self.classifier = tf.keras.layers.Dense( | |
config.num_labels, kernel_initializer=get_initializer(config.initializer_range), name="classifier" | |
) | |
self.dropout = tf.keras.layers.Dropout(config.seq_classif_dropout) | |
def call( | |
self, | |
input_ids=None, | |
attention_mask=None, | |
head_mask=None, | |
inputs_embeds=None, | |
output_attentions=None, | |
output_hidden_states=None, | |
return_dict=None, | |
labels=None, | |
training=False, | |
**kwargs, | |
): | |
r""" | |
labels (:obj:`tf.Tensor` of shape :obj:`(batch_size,)`, `optional`): | |
Labels for computing the sequence classification/regression loss. Indices should be in ``[0, ..., | |
config.num_labels - 1]``. If ``config.num_labels == 1`` a regression loss is computed (Mean-Square loss), | |
If ``config.num_labels > 1`` a classification loss is computed (Cross-Entropy). | |
""" | |
inputs = input_processing( | |
func=self.call, | |
config=self.config, | |
input_ids=input_ids, | |
attention_mask=attention_mask, | |
head_mask=head_mask, | |
inputs_embeds=inputs_embeds, | |
output_attentions=output_attentions, | |
output_hidden_states=output_hidden_states, | |
return_dict=return_dict, | |
labels=labels, | |
training=training, | |
kwargs_call=kwargs, | |
) | |
distilbert_output = self.distilbert( | |
input_ids=inputs["input_ids"], | |
attention_mask=inputs["attention_mask"], | |
head_mask=inputs["head_mask"], | |
inputs_embeds=inputs["inputs_embeds"], | |
output_attentions=inputs["output_attentions"], | |
output_hidden_states=inputs["output_hidden_states"], | |
return_dict=inputs["return_dict"], | |
training=inputs["training"], | |
) | |
hidden_state = distilbert_output[0] # (bs, seq_len, dim) | |
pooled_output = hidden_state[:, 0] # (bs, dim) | |
pooled_output = self.pre_classifier(pooled_output) # (bs, dim) | |
pooled_output = self.dropout(pooled_output, training=inputs["training"]) # (bs, dim) | |
logits = self.classifier(pooled_output) # (bs, dim) | |
loss = None if inputs["labels"] is None else self.compute_loss(inputs["labels"], logits) | |
if not inputs["return_dict"]: | |
output = (logits,) + distilbert_output[1:] | |
return ((loss,) + output) if loss is not None else output | |
return TFSequenceClassifierOutput( | |
loss=loss, | |
logits=logits, | |
hidden_states=distilbert_output.hidden_states, | |
attentions=distilbert_output.attentions, | |
) | |
# Copied from transformers.models.bert.modeling_tf_bert.TFBertForSequenceClassification.serving_output | |
def serving_output(self, output: TFSequenceClassifierOutput) -> TFSequenceClassifierOutput: | |
hs = tf.convert_to_tensor(output.hidden_states) if self.config.output_hidden_states else None | |
attns = tf.convert_to_tensor(output.attentions) if self.config.output_attentions else None | |
return TFSequenceClassifierOutput(logits=output.logits, hidden_states=hs, attentions=attns) | |
class TFDistilBertForTokenClassification(TFDistilBertPreTrainedModel, TFTokenClassificationLoss): | |
def __init__(self, config, *inputs, **kwargs): | |
super().__init__(config, *inputs, **kwargs) | |
self.num_labels = config.num_labels | |
self.distilbert = TFDistilBertMainLayer(config, name="distilbert") | |
self.dropout = tf.keras.layers.Dropout(config.dropout) | |
self.classifier = tf.keras.layers.Dense( | |
config.num_labels, kernel_initializer=get_initializer(config.initializer_range), name="classifier" | |
) | |
def call( | |
self, | |
input_ids=None, | |
attention_mask=None, | |
head_mask=None, | |
inputs_embeds=None, | |
output_attentions=None, | |
output_hidden_states=None, | |
return_dict=None, | |
labels=None, | |
training=False, | |
**kwargs, | |
): | |
r""" | |
labels (:obj:`tf.Tensor` of shape :obj:`(batch_size, sequence_length)`, `optional`): | |
Labels for computing the token classification loss. Indices should be in ``[0, ..., config.num_labels - | |
1]``. | |
""" | |
inputs = input_processing( | |
func=self.call, | |
config=self.config, | |
input_ids=input_ids, | |
attention_mask=attention_mask, | |
head_mask=head_mask, | |
inputs_embeds=inputs_embeds, | |
output_attentions=output_attentions, | |
output_hidden_states=output_hidden_states, | |
return_dict=return_dict, | |
labels=labels, | |
training=training, | |
kwargs_call=kwargs, | |
) | |
outputs = self.distilbert( | |
input_ids=inputs["input_ids"], | |
attention_mask=inputs["attention_mask"], | |
head_mask=inputs["head_mask"], | |
inputs_embeds=inputs["inputs_embeds"], | |
output_attentions=inputs["output_attentions"], | |
output_hidden_states=inputs["output_hidden_states"], | |
return_dict=inputs["return_dict"], | |
training=inputs["training"], | |
) | |
sequence_output = outputs[0] | |
sequence_output = self.dropout(sequence_output, training=inputs["training"]) | |
logits = self.classifier(sequence_output) | |
loss = None if inputs["labels"] is None else self.compute_loss(inputs["labels"], logits) | |
if not inputs["return_dict"]: | |
output = (logits,) + outputs[1:] | |
return ((loss,) + output) if loss is not None else output | |
return TFTokenClassifierOutput( | |
loss=loss, | |
logits=logits, | |
hidden_states=outputs.hidden_states, | |
attentions=outputs.attentions, | |
) | |
# Copied from transformers.models.bert.modeling_tf_bert.TFBertForTokenClassification.serving_output | |
def serving_output(self, output: TFTokenClassifierOutput) -> TFTokenClassifierOutput: | |
hs = tf.convert_to_tensor(output.hidden_states) if self.config.output_hidden_states else None | |
attns = tf.convert_to_tensor(output.attentions) if self.config.output_attentions else None | |
return TFTokenClassifierOutput(logits=output.logits, hidden_states=hs, attentions=attns) | |
class TFDistilBertForMultipleChoice(TFDistilBertPreTrainedModel, TFMultipleChoiceLoss): | |
def __init__(self, config, *inputs, **kwargs): | |
super().__init__(config, *inputs, **kwargs) | |
self.distilbert = TFDistilBertMainLayer(config, name="distilbert") | |
self.dropout = tf.keras.layers.Dropout(config.seq_classif_dropout) | |
self.pre_classifier = tf.keras.layers.Dense( | |
config.dim, | |
kernel_initializer=get_initializer(config.initializer_range), | |
activation="relu", | |
name="pre_classifier", | |
) | |
self.classifier = tf.keras.layers.Dense( | |
1, kernel_initializer=get_initializer(config.initializer_range), name="classifier" | |
) | |
def dummy_inputs(self): | |
""" | |
Dummy inputs to build the network. | |
Returns: | |
tf.Tensor with dummy inputs | |
""" | |
return {"input_ids": tf.constant(MULTIPLE_CHOICE_DUMMY_INPUTS)} | |
def call( | |
self, | |
input_ids=None, | |
attention_mask=None, | |
head_mask=None, | |
inputs_embeds=None, | |
output_attentions=None, | |
output_hidden_states=None, | |
return_dict=None, | |
labels=None, | |
training=False, | |
**kwargs, | |
): | |
r""" | |
labels (:obj:`tf.Tensor` of shape :obj:`(batch_size,)`, `optional`): | |
Labels for computing the multiple choice classification loss. Indices should be in ``[0, ..., | |
num_choices]`` where :obj:`num_choices` is the size of the second dimension of the input tensors. (See | |
:obj:`input_ids` above) | |
""" | |
inputs = input_processing( | |
func=self.call, | |
config=self.config, | |
input_ids=input_ids, | |
attention_mask=attention_mask, | |
head_mask=head_mask, | |
inputs_embeds=inputs_embeds, | |
output_attentions=output_attentions, | |
output_hidden_states=output_hidden_states, | |
return_dict=return_dict, | |
labels=labels, | |
training=training, | |
kwargs_call=kwargs, | |
) | |
if inputs["input_ids"] is not None: | |
num_choices = shape_list(inputs["input_ids"])[1] | |
seq_length = shape_list(inputs["input_ids"])[2] | |
else: | |
num_choices = shape_list(inputs["inputs_embeds"])[1] | |
seq_length = shape_list(inputs["inputs_embeds"])[2] | |
flat_input_ids = tf.reshape(inputs["input_ids"], (-1, seq_length)) if inputs["input_ids"] is not None else None | |
flat_attention_mask = ( | |
tf.reshape(inputs["attention_mask"], (-1, seq_length)) if inputs["attention_mask"] is not None else None | |
) | |
flat_inputs_embeds = ( | |
tf.reshape(inputs["inputs_embeds"], (-1, seq_length, shape_list(inputs["inputs_embeds"])[3])) | |
if inputs["inputs_embeds"] is not None | |
else None | |
) | |
distilbert_output = self.distilbert( | |
flat_input_ids, | |
flat_attention_mask, | |
inputs["head_mask"], | |
flat_inputs_embeds, | |
inputs["output_attentions"], | |
inputs["output_hidden_states"], | |
return_dict=inputs["return_dict"], | |
training=inputs["training"], | |
) | |
hidden_state = distilbert_output[0] # (bs, seq_len, dim) | |
pooled_output = hidden_state[:, 0] # (bs, dim) | |
pooled_output = self.pre_classifier(pooled_output) # (bs, dim) | |
pooled_output = self.dropout(pooled_output, training=inputs["training"]) # (bs, dim) | |
logits = self.classifier(pooled_output) | |
reshaped_logits = tf.reshape(logits, (-1, num_choices)) | |
loss = None if inputs["labels"] is None else self.compute_loss(inputs["labels"], reshaped_logits) | |
if not inputs["return_dict"]: | |
output = (reshaped_logits,) + distilbert_output[1:] | |
return ((loss,) + output) if loss is not None else output | |
return TFMultipleChoiceModelOutput( | |
loss=loss, | |
logits=reshaped_logits, | |
hidden_states=distilbert_output.hidden_states, | |
attentions=distilbert_output.attentions, | |
) | |
def serving(self, inputs): | |
output = self.call(inputs) | |
return self.serving_output(output) | |
# Copied from transformers.models.bert.modeling_tf_bert.TFBertForMultipleChoice.serving_output | |
def serving_output(self, output: TFMultipleChoiceModelOutput) -> TFMultipleChoiceModelOutput: | |
hs = tf.convert_to_tensor(output.hidden_states) if self.config.output_hidden_states else None | |
attns = tf.convert_to_tensor(output.attentions) if self.config.output_attentions else None | |
return TFMultipleChoiceModelOutput(logits=output.logits, hidden_states=hs, attentions=attns) | |
class TFDistilBertForQuestionAnswering(TFDistilBertPreTrainedModel, TFQuestionAnsweringLoss): | |
def __init__(self, config, *inputs, **kwargs): | |
super().__init__(config, *inputs, **kwargs) | |
self.distilbert = TFDistilBertMainLayer(config, name="distilbert") | |
self.qa_outputs = tf.keras.layers.Dense( | |
config.num_labels, kernel_initializer=get_initializer(config.initializer_range), name="qa_outputs" | |
) | |
assert config.num_labels == 2, f"Incorrect number of labels {config.num_labels} instead of 2" | |
self.dropout = tf.keras.layers.Dropout(config.qa_dropout) | |
def call( | |
self, | |
input_ids=None, | |
attention_mask=None, | |
head_mask=None, | |
inputs_embeds=None, | |
output_attentions=None, | |
output_hidden_states=None, | |
return_dict=None, | |
start_positions=None, | |
end_positions=None, | |
training=False, | |
**kwargs, | |
): | |
r""" | |
start_positions (:obj:`tf.Tensor` of shape :obj:`(batch_size,)`, `optional`): | |
Labels for position (index) of the start of the labelled span for computing the token classification loss. | |
Positions are clamped to the length of the sequence (:obj:`sequence_length`). Position outside of the | |
sequence are not taken into account for computing the loss. | |
end_positions (:obj:`tf.Tensor` of shape :obj:`(batch_size,)`, `optional`): | |
Labels for position (index) of the end of the labelled span for computing the token classification loss. | |
Positions are clamped to the length of the sequence (:obj:`sequence_length`). Position outside of the | |
sequence are not taken into account for computing the loss. | |
""" | |
inputs = input_processing( | |
func=self.call, | |
config=self.config, | |
input_ids=input_ids, | |
attention_mask=attention_mask, | |
head_mask=head_mask, | |
inputs_embeds=inputs_embeds, | |
output_attentions=output_attentions, | |
output_hidden_states=output_hidden_states, | |
return_dict=return_dict, | |
start_positions=start_positions, | |
end_positions=end_positions, | |
training=training, | |
kwargs_call=kwargs, | |
) | |
distilbert_output = self.distilbert( | |
input_ids=inputs["input_ids"], | |
attention_mask=inputs["attention_mask"], | |
head_mask=inputs["head_mask"], | |
inputs_embeds=inputs["inputs_embeds"], | |
output_attentions=inputs["output_attentions"], | |
output_hidden_states=inputs["output_hidden_states"], | |
return_dict=inputs["return_dict"], | |
training=inputs["training"], | |
) | |
hidden_states = distilbert_output[0] # (bs, max_query_len, dim) | |
hidden_states = self.dropout(hidden_states, training=inputs["training"]) # (bs, max_query_len, dim) | |
logits = self.qa_outputs(hidden_states) # (bs, max_query_len, 2) | |
start_logits, end_logits = tf.split(logits, 2, axis=-1) | |
start_logits = tf.squeeze(start_logits, axis=-1) | |
end_logits = tf.squeeze(end_logits, axis=-1) | |
loss = None | |
if inputs["start_positions"] is not None and inputs["end_positions"] is not None: | |
labels = {"start_position": inputs["start_positions"]} | |
labels["end_position"] = inputs["end_positions"] | |
loss = self.compute_loss(labels, (start_logits, end_logits)) | |
if not inputs["return_dict"]: | |
output = (start_logits, end_logits) + distilbert_output[1:] | |
return ((loss,) + output) if loss is not None else output | |
return TFQuestionAnsweringModelOutput( | |
loss=loss, | |
start_logits=start_logits, | |
end_logits=end_logits, | |
hidden_states=distilbert_output.hidden_states, | |
attentions=distilbert_output.attentions, | |
) | |
# Copied from transformers.models.bert.modeling_tf_bert.TFBertForQuestionAnswering.serving_output | |
def serving_output(self, output: TFQuestionAnsweringModelOutput) -> TFQuestionAnsweringModelOutput: | |
hs = tf.convert_to_tensor(output.hidden_states) if self.config.output_hidden_states else None | |
attns = tf.convert_to_tensor(output.attentions) if self.config.output_attentions else None | |
return TFQuestionAnsweringModelOutput( | |
start_logits=output.start_logits, end_logits=output.end_logits, hidden_states=hs, attentions=attns | |
) | |