Spaces:
Sleeping
Sleeping
# coding=utf-8 | |
# Copyright 2018 The OpenAI Team Authors and HuggingFace Inc. team. | |
# Copyright (c) 2018, NVIDIA CORPORATION. All rights reserved. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
""" TF 2.0 OpenAI GPT model.""" | |
from dataclasses import dataclass | |
from typing import Optional, Tuple | |
import tensorflow as tf | |
from ...activations_tf import get_tf_activation | |
from ...file_utils import ( | |
ModelOutput, | |
add_code_sample_docstrings, | |
add_start_docstrings, | |
add_start_docstrings_to_model_forward, | |
replace_return_docstrings, | |
) | |
from ...modeling_tf_outputs import TFBaseModelOutput, TFCausalLMOutput, TFSequenceClassifierOutput | |
from ...modeling_tf_utils import ( | |
TFCausalLanguageModelingLoss, | |
TFConv1D, | |
TFPreTrainedModel, | |
TFSequenceClassificationLoss, | |
TFSequenceSummary, | |
TFSharedEmbeddings, | |
get_initializer, | |
input_processing, | |
keras_serializable, | |
shape_list, | |
) | |
from ...utils import logging | |
from .configuration_openai import OpenAIGPTConfig | |
logger = logging.get_logger(__name__) | |
_CHECKPOINT_FOR_DOC = "openai-gpt" | |
_CONFIG_FOR_DOC = "OpenAIGPTConfig" | |
_TOKENIZER_FOR_DOC = "OpenAIGPTTokenizer" | |
TF_OPENAI_GPT_PRETRAINED_MODEL_ARCHIVE_LIST = [ | |
"openai-gpt", | |
# See all OpenAI GPT models at https://huggingface.co/models?filter=openai-gpt | |
] | |
class TFAttention(tf.keras.layers.Layer): | |
def __init__(self, nx, n_ctx, config, scale=False, **kwargs): | |
super().__init__(**kwargs) | |
n_state = nx # in Attention: n_state=768 (nx=n_embd) | |
# [switch nx => n_state from Block to Attention to keep identical to TF implementation] | |
assert ( | |
n_state % config.n_head == 0 | |
), f"Hidden dimension {n_state} not dividable by number of heads {config.n_head}" | |
self.n_ctx = n_ctx | |
self.n_head = config.n_head | |
self.split_size = n_state | |
self.scale = scale | |
self.output_attentions = config.output_attentions | |
self.c_attn = TFConv1D(n_state * 3, nx, initializer_range=config.initializer_range, name="c_attn") | |
self.c_proj = TFConv1D(n_state, nx, initializer_range=config.initializer_range, name="c_proj") | |
self.attn_dropout = tf.keras.layers.Dropout(config.attn_pdrop) | |
self.resid_dropout = tf.keras.layers.Dropout(config.resid_pdrop) | |
self.pruned_heads = set() | |
def prune_heads(self, heads): | |
pass | |
def causal_attention_mask(nd, ns): | |
""" | |
1's in the lower triangle, counting from the lower right corner. Same as tf.matrix_band_part(tf.ones([nd, ns]), | |
-1, ns-nd), but doesn't produce garbage on TPUs. | |
""" | |
i = tf.range(nd)[:, None] | |
j = tf.range(ns) | |
m = i >= j - ns + nd | |
return m | |
def _attn(self, q, k, v, attention_mask, head_mask, output_attentions, training=False): | |
# q, k, v have shape [batch, heads, sequence, features] | |
w = tf.matmul(q, k, transpose_b=True) | |
if self.scale: | |
dk = tf.cast(shape_list(k)[-1], dtype=w.dtype) # scale attention_scores | |
w = w / tf.math.sqrt(dk) | |
# w has shape [batch, heads, dst_sequence, src_sequence], where information flows from src to dst. | |
_, _, nd, ns = shape_list(w) | |
b = tf.cast(self.causal_attention_mask(nd, ns), dtype=w.dtype) | |
b = tf.reshape(b, [1, 1, nd, ns]) | |
w = w * b - 1e4 * (1 - b) | |
if attention_mask is not None: | |
# Apply the attention mask | |
attention_mask = tf.cast(attention_mask, dtype=w.dtype) | |
w = w + attention_mask | |
w = tf.nn.softmax(w, axis=-1) | |
w = self.attn_dropout(w, training=training) | |
# Mask heads if we want to | |
if head_mask is not None: | |
w = w * head_mask | |
outputs = [tf.matmul(w, v)] | |
if output_attentions: | |
outputs.append(w) | |
return outputs | |
def merge_heads(self, x): | |
x = tf.transpose(x, [0, 2, 1, 3]) | |
x_shape = shape_list(x) | |
new_x_shape = x_shape[:-2] + [x_shape[-2] * x_shape[-1]] | |
return tf.reshape(x, new_x_shape) | |
def split_heads(self, x): | |
x_shape = shape_list(x) | |
new_x_shape = x_shape[:-1] + [self.n_head, x_shape[-1] // self.n_head] | |
x = tf.reshape(x, new_x_shape) | |
return tf.transpose(x, (0, 2, 1, 3)) # (batch, head, seq_length, head_features) | |
def call(self, x, attention_mask, head_mask, output_attentions, training=False): | |
x = self.c_attn(x) | |
query, key, value = tf.split(x, 3, axis=2) | |
query = self.split_heads(query) | |
key = self.split_heads(key) | |
value = self.split_heads(value) | |
attn_outputs = self._attn(query, key, value, attention_mask, head_mask, output_attentions, training=training) | |
a = attn_outputs[0] | |
a = self.merge_heads(a) | |
a = self.c_proj(a) | |
a = self.resid_dropout(a, training=training) | |
outputs = [a] + attn_outputs[1:] | |
return outputs # a, (attentions) | |
class TFMLP(tf.keras.layers.Layer): | |
def __init__(self, n_state, config, **kwargs): | |
super().__init__(**kwargs) | |
nx = config.n_embd | |
self.c_fc = TFConv1D(n_state, nx, initializer_range=config.initializer_range, name="c_fc") | |
self.c_proj = TFConv1D(nx, n_state, initializer_range=config.initializer_range, name="c_proj") | |
self.act = get_tf_activation("gelu") | |
self.dropout = tf.keras.layers.Dropout(config.resid_pdrop) | |
def call(self, x, training=False): | |
h = self.act(self.c_fc(x)) | |
h2 = self.c_proj(h) | |
h2 = self.dropout(h2, training=training) | |
return h2 | |
class TFBlock(tf.keras.layers.Layer): | |
def __init__(self, n_ctx, config, scale=False, **kwargs): | |
super().__init__(**kwargs) | |
nx = config.n_embd | |
self.attn = TFAttention(nx, n_ctx, config, scale, name="attn") | |
self.ln_1 = tf.keras.layers.LayerNormalization(epsilon=config.layer_norm_epsilon, name="ln_1") | |
self.mlp = TFMLP(4 * nx, config, name="mlp") | |
self.ln_2 = tf.keras.layers.LayerNormalization(epsilon=config.layer_norm_epsilon, name="ln_2") | |
def call(self, x, attention_mask, head_mask, output_attentions, training=False): | |
output_attn = self.attn(x, attention_mask, head_mask, output_attentions, training=training) | |
a = output_attn[0] # output_attn: a, (attentions) | |
n = self.ln_1(x + a) | |
m = self.mlp(n, training=training) | |
h = self.ln_2(n + m) | |
outputs = [h] + output_attn[1:] | |
return outputs # x, (attentions) | |
class TFOpenAIGPTMainLayer(tf.keras.layers.Layer): | |
config_class = OpenAIGPTConfig | |
def __init__(self, config, *inputs, **kwargs): | |
super().__init__(*inputs, **kwargs) | |
self.config = config | |
self.output_hidden_states = config.output_hidden_states | |
self.output_attentions = config.output_attentions | |
self.return_dict = config.use_return_dict | |
self.num_hidden_layers = config.n_layer | |
self.vocab_size = config.vocab_size | |
self.n_embd = config.n_embd | |
self.n_positions = config.n_positions | |
self.initializer_range = config.initializer_range | |
self.tokens_embed = TFSharedEmbeddings( | |
config.vocab_size, config.n_embd, initializer_range=config.initializer_range, name="tokens_embed" | |
) | |
self.drop = tf.keras.layers.Dropout(config.embd_pdrop) | |
self.h = [TFBlock(config.n_ctx, config, scale=True, name=f"h_._{i}") for i in range(config.n_layer)] | |
def build(self, input_shape): | |
with tf.name_scope("positions_embed"): | |
self.positions_embed = self.add_weight( | |
name="embeddings", | |
shape=[self.n_positions, self.n_embd], | |
initializer=get_initializer(self.initializer_range), | |
) | |
super().build(input_shape) | |
def get_input_embeddings(self): | |
return self.tokens_embed | |
def set_input_embeddings(self, value): | |
self.tokens_embed.weight = value | |
self.tokens_embed.vocab_size = shape_list(value)[0] | |
def _prune_heads(self, heads_to_prune): | |
""" | |
Prunes heads of the model. heads_to_prune: dict of {layer_num: list of heads to prune in this layer} | |
""" | |
raise NotImplementedError | |
def call( | |
self, | |
input_ids=None, | |
attention_mask=None, | |
token_type_ids=None, | |
position_ids=None, | |
head_mask=None, | |
inputs_embeds=None, | |
output_attentions=None, | |
output_hidden_states=None, | |
return_dict=None, | |
training=False, | |
**kwargs, | |
): | |
inputs = input_processing( | |
func=self.call, | |
config=self.config, | |
input_ids=input_ids, | |
attention_mask=attention_mask, | |
token_type_ids=token_type_ids, | |
position_ids=position_ids, | |
head_mask=head_mask, | |
inputs_embeds=inputs_embeds, | |
output_attentions=output_attentions, | |
output_hidden_states=output_hidden_states, | |
return_dict=return_dict, | |
training=training, | |
kwargs_call=kwargs, | |
) | |
if inputs["input_ids"] is not None and inputs["inputs_embeds"] is not None: | |
raise ValueError("You cannot specify both input_ids and inputs_embeds at the same time") | |
elif inputs["input_ids"] is not None: | |
input_shape = shape_list(inputs["input_ids"]) | |
inputs["input_ids"] = tf.reshape(inputs["input_ids"], [-1, input_shape[-1]]) | |
elif inputs["inputs_embeds"] is not None: | |
input_shape = shape_list(inputs["inputs_embeds"])[:-1] | |
else: | |
raise ValueError("You have to specify either input_ids or inputs_embeds") | |
if inputs["position_ids"] is None: | |
inputs["position_ids"] = tf.expand_dims(tf.range(input_shape[-1]), axis=0) | |
if inputs["attention_mask"] is not None: | |
# We create a 3D attention mask from a 2D tensor mask. | |
# Sizes are [batch_size, 1, 1, to_seq_length] | |
# So we can broadcast to [batch_size, num_heads, from_seq_length, to_seq_length] | |
# this attention mask is more simple than the triangular masking of causal attention | |
# used in OpenAI GPT, we just need to prepare the broadcast dimension here. | |
inputs["attention_mask"] = tf.reshape(inputs["attention_mask"], (input_shape[0], 1, 1, input_shape[1])) | |
# Since attention_mask is 1.0 for positions we want to attend and 0.0 for | |
# masked positions, this operation will create a tensor which is 0.0 for | |
# positions we want to attend and -10000.0 for masked positions. | |
# Since we are adding it to the raw scores before the softmax, this is | |
# effectively the same as removing these entirely. | |
one_cst = tf.constant(1.0) | |
inputs["attention_mask"] = tf.cast(inputs["attention_mask"], dtype=one_cst.dtype) | |
inputs["attention_mask"] = tf.multiply( | |
tf.subtract(one_cst, inputs["attention_mask"]), tf.constant(-10000.0) | |
) | |
else: | |
inputs["attention_mask"] = None | |
# Prepare head mask if needed | |
# 1.0 in head_mask indicate we keep the head | |
# attention_probs has shape bsz x n_heads x N x N | |
# input head_mask has shape [num_heads] or [num_hidden_layers x num_heads] | |
# and head_mask is converted to shape [num_hidden_layers x batch x num_heads x seq_length x seq_length] | |
if inputs["head_mask"] is not None: | |
raise NotImplementedError | |
else: | |
inputs["head_mask"] = [None] * self.num_hidden_layers | |
# head_mask = tf.constant([0] * self.num_hidden_layers) | |
inputs["position_ids"] = tf.reshape(inputs["position_ids"], [-1, shape_list(inputs["position_ids"])[-1]]) | |
if inputs["inputs_embeds"] is None: | |
inputs["inputs_embeds"] = self.tokens_embed(inputs["input_ids"], mode="embedding") | |
position_embeds = tf.gather(self.positions_embed, inputs["position_ids"]) | |
if inputs["token_type_ids"] is not None: | |
inputs["token_type_ids"] = tf.reshape( | |
inputs["token_type_ids"], [-1, shape_list(inputs["token_type_ids"])[-1]] | |
) | |
token_type_embeds = self.tokens_embed(inputs["token_type_ids"], mode="embedding") | |
else: | |
token_type_embeds = 0 | |
hidden_states = inputs["inputs_embeds"] + position_embeds + token_type_embeds | |
hidden_states = self.drop(hidden_states, training=inputs["training"]) | |
output_shape = input_shape + [shape_list(hidden_states)[-1]] | |
all_attentions = () if inputs["output_attentions"] else None | |
all_hidden_states = () if inputs["output_hidden_states"] else None | |
for i, block in enumerate(self.h): | |
if inputs["output_hidden_states"]: | |
all_hidden_states = all_hidden_states + (tf.reshape(hidden_states, output_shape),) | |
outputs = block( | |
hidden_states, | |
inputs["attention_mask"], | |
inputs["head_mask"][i], | |
inputs["output_attentions"], | |
training=inputs["training"], | |
) | |
hidden_states = outputs[0] | |
if inputs["output_attentions"]: | |
all_attentions = all_attentions + (outputs[1],) | |
hidden_states = tf.reshape(hidden_states, output_shape) | |
# Add last hidden state | |
if inputs["output_hidden_states"]: | |
all_hidden_states = all_hidden_states + (hidden_states,) | |
if inputs["output_attentions"]: | |
# let the number of heads free (-1) so we can extract attention even after head pruning | |
attention_output_shape = input_shape[:-1] + [-1] + shape_list(all_attentions[0])[-2:] | |
all_attentions = tuple(tf.reshape(t, attention_output_shape) for t in all_attentions) | |
if not inputs["return_dict"]: | |
return tuple(v for v in [hidden_states, all_hidden_states, all_attentions] if v is not None) | |
return TFBaseModelOutput( | |
last_hidden_state=hidden_states, | |
hidden_states=all_hidden_states, | |
attentions=all_attentions, | |
) | |
class TFOpenAIGPTPreTrainedModel(TFPreTrainedModel): | |
""" | |
An abstract class to handle weights initialization and a simple interface for downloading and loading pretrained | |
models. | |
""" | |
config_class = OpenAIGPTConfig | |
base_model_prefix = "transformer" | |
def serving(self, inputs): | |
output = self.call(inputs) | |
return self.serving_output(output) | |
class TFOpenAIGPTDoubleHeadsModelOutput(ModelOutput): | |
""" | |
Base class for outputs of models predicting if two sentences are consecutive or not. | |
Args: | |
logits (:obj:`tf.Tensor` of shape :obj:`(batch_size, num_choices, sequence_length, config.vocab_size)`): | |
Prediction scores of the language modeling head (scores for each vocabulary token before SoftMax). | |
mc_logits (:obj:`tf.Tensor` of shape :obj:`(batch_size, num_choices)`): | |
Prediction scores of the multiple choice classification head (scores for each choice before SoftMax). | |
hidden_states (:obj:`tuple(tf.Tensor)`, `optional`, returned when ``output_hidden_states=True`` is passed or when ``config.output_hidden_states=True``): | |
Tuple of :obj:`tf.Tensor` (one for the output of the embeddings + one for the output of each layer) of | |
shape :obj:`(batch_size, sequence_length, hidden_size)`. | |
Hidden-states of the model at the output of each layer plus the initial embedding outputs. | |
attentions (:obj:`tuple(tf.Tensor)`, `optional`, returned when ``output_attentions=True`` is passed or when ``config.output_attentions=True``): | |
Tuple of :obj:`tf.Tensor` (one for each layer) of shape :obj:`(batch_size, num_heads, sequence_length, | |
sequence_length)`. | |
Attentions weights after the attention softmax, used to compute the weighted average in the self-attention | |
heads. | |
""" | |
logits: tf.Tensor = None | |
mc_logits: tf.Tensor = None | |
hidden_states: Optional[Tuple[tf.Tensor]] = None | |
attentions: Optional[Tuple[tf.Tensor]] = None | |
OPENAI_GPT_START_DOCSTRING = r""" | |
This model inherits from :class:`~transformers.TFPreTrainedModel`. Check the superclass documentation for the | |
generic methods the library implements for all its model (such as downloading or saving, resizing the input | |
embeddings, pruning heads etc.) | |
This model is also a `tf.keras.Model <https://www.tensorflow.org/api_docs/python/tf/keras/Model>`__ subclass. Use | |
it as a regular TF 2.0 Keras Model and refer to the TF 2.0 documentation for all matter related to general usage | |
and behavior. | |
.. note:: | |
TF 2.0 models accepts two formats as inputs: | |
- having all inputs as keyword arguments (like PyTorch models), or | |
- having all inputs as a list, tuple or dict in the first positional arguments. | |
This second option is useful when using :meth:`tf.keras.Model.fit` method which currently requires having all | |
the tensors in the first argument of the model call function: :obj:`model(inputs)`. | |
If you choose this second option, there are three possibilities you can use to gather all the input Tensors in | |
the first positional argument : | |
- a single Tensor with :obj:`input_ids` only and nothing else: :obj:`model(inputs_ids)` | |
- a list of varying length with one or several input Tensors IN THE ORDER given in the docstring: | |
:obj:`model([input_ids, attention_mask])` or :obj:`model([input_ids, attention_mask, token_type_ids])` | |
- a dictionary with one or several input Tensors associated to the input names given in the docstring: | |
:obj:`model({"input_ids": input_ids, "token_type_ids": token_type_ids})` | |
Parameters: | |
config (:class:`~transformers.OpenAIGPTConfig`): Model configuration class with all the parameters of the model. | |
Initializing with a config file does not load the weights associated with the model, only the | |
configuration. Check out the :meth:`~transformers.PreTrainedModel.from_pretrained` method to load the model | |
weights. | |
""" | |
OPENAI_GPT_INPUTS_DOCSTRING = r""" | |
Args: | |
input_ids (:obj:`Numpy array` or :obj:`tf.Tensor` of shape :obj:`(batch_size, sequence_length)`): | |
Indices of input sequence tokens in the vocabulary. | |
Indices can be obtained using :class:`~transformers.OpenAIGPTTokenizer`. See | |
:func:`transformers.PreTrainedTokenizer.__call__` and :func:`transformers.PreTrainedTokenizer.encode` for | |
details. | |
`What are input IDs? <../glossary.html#input-ids>`__ | |
attention_mask (:obj:`tf.Tensor` or :obj:`Numpy array` of shape :obj:`(batch_size, sequence_length)`, `optional`): | |
Mask to avoid performing attention on padding token indices. Mask values selected in ``[0, 1]``: | |
- 1 for tokens that are **not masked**, | |
- 0 for tokens that are **masked**. | |
`What are attention masks? <../glossary.html#attention-mask>`__ | |
token_type_ids (:obj:`tf.Tensor` or :obj:`Numpy array` of shape :obj:`(batch_size, sequence_length)`, `optional`): | |
Segment token indices to indicate first and second portions of the inputs. Indices are selected in ``[0, | |
1]``: | |
- 0 corresponds to a `sentence A` token, | |
- 1 corresponds to a `sentence B` token. | |
`What are token type IDs? <../glossary.html#token-type-ids>`__ | |
position_ids (:obj:`tf.Tensor` or :obj:`Numpy array` of shape :obj:`(batch_size, sequence_length)`, `optional`): | |
Indices of positions of each input sequence tokens in the position embeddings. Selected in the range ``[0, | |
config.max_position_embeddings - 1]``. | |
`What are position IDs? <../glossary.html#position-ids>`__ | |
head_mask (:obj:`tf.Tensor` or :obj:`Numpy array` of shape :obj:`(num_heads,)` or :obj:`(num_layers, num_heads)`, `optional`): | |
Mask to nullify selected heads of the self-attention modules. Mask values selected in ``[0, 1]``: | |
- 1 indicates the head is **not masked**, | |
- 0 indicates the head is **masked**. | |
inputs_embeds (:obj:`tf.Tensor` or :obj:`Numpy array` of shape :obj:`(batch_size, sequence_length, hidden_size)`, `optional`): | |
Optionally, instead of passing :obj:`input_ids` you can choose to directly pass an embedded representation. | |
This is useful if you want more control over how to convert :obj:`input_ids` indices into associated | |
vectors than the model's internal embedding lookup matrix. | |
output_attentions (:obj:`bool`, `optional`): | |
Whether or not to return the attentions tensors of all attention layers. See ``attentions`` under returned | |
tensors for more detail. This argument can be used only in eager mode, in graph mode the value in the | |
config will be used instead. | |
output_hidden_states (:obj:`bool`, `optional`): | |
Whether or not to return the hidden states of all layers. See ``hidden_states`` under returned tensors for | |
more detail. This argument can be used only in eager mode, in graph mode the value in the config will be | |
used instead. | |
return_dict (:obj:`bool`, `optional`): | |
Whether or not to return a :class:`~transformers.file_utils.ModelOutput` instead of a plain tuple. This | |
argument can be used in eager mode, in graph mode the value will always be set to True. | |
training (:obj:`bool`, `optional`, defaults to :obj:`False`): | |
Whether or not to use the model in training mode (some modules like dropout modules have different | |
behaviors between training and evaluation). | |
""" | |
class TFOpenAIGPTModel(TFOpenAIGPTPreTrainedModel): | |
def __init__(self, config, *inputs, **kwargs): | |
super().__init__(config, *inputs, **kwargs) | |
self.transformer = TFOpenAIGPTMainLayer(config, name="transformer") | |
def call( | |
self, | |
input_ids=None, | |
attention_mask=None, | |
token_type_ids=None, | |
position_ids=None, | |
head_mask=None, | |
inputs_embeds=None, | |
output_attentions=None, | |
output_hidden_states=None, | |
return_dict=None, | |
training=False, | |
**kwargs, | |
): | |
inputs = input_processing( | |
func=self.call, | |
config=self.config, | |
input_ids=input_ids, | |
attention_mask=attention_mask, | |
token_type_ids=token_type_ids, | |
position_ids=position_ids, | |
head_mask=head_mask, | |
inputs_embeds=inputs_embeds, | |
output_attentions=output_attentions, | |
output_hidden_states=output_hidden_states, | |
return_dict=return_dict, | |
training=training, | |
kwargs_call=kwargs, | |
) | |
outputs = self.transformer( | |
input_ids=inputs["input_ids"], | |
attention_mask=inputs["attention_mask"], | |
token_type_ids=inputs["token_type_ids"], | |
position_ids=inputs["position_ids"], | |
head_mask=inputs["head_mask"], | |
inputs_embeds=inputs["inputs_embeds"], | |
output_attentions=inputs["output_attentions"], | |
output_hidden_states=inputs["output_hidden_states"], | |
return_dict=inputs["return_dict"], | |
training=inputs["training"], | |
) | |
return outputs | |
# Copied from transformers.models.distilbert.modeling_tf_distilbert.TFDistilBertModel.serving_output | |
def serving_output(self, output): | |
hs = tf.convert_to_tensor(output.hidden_states) if self.config.output_hidden_states else None | |
attns = tf.convert_to_tensor(output.attentions) if self.config.output_attentions else None | |
return TFBaseModelOutput(last_hidden_state=output.last_hidden_state, hidden_states=hs, attentions=attns) | |
class TFOpenAIGPTLMHeadModel(TFOpenAIGPTPreTrainedModel, TFCausalLanguageModelingLoss): | |
def __init__(self, config, *inputs, **kwargs): | |
super().__init__(config, *inputs, **kwargs) | |
self.transformer = TFOpenAIGPTMainLayer(config, name="transformer") | |
def get_output_embeddings(self): | |
return self.get_input_embeddings() | |
def set_output_embeddings(self, value): | |
self.set_input_embeddings(value) | |
def call( | |
self, | |
input_ids=None, | |
attention_mask=None, | |
token_type_ids=None, | |
position_ids=None, | |
head_mask=None, | |
inputs_embeds=None, | |
output_attentions=None, | |
output_hidden_states=None, | |
return_dict=None, | |
labels=None, | |
training=False, | |
**kwargs, | |
): | |
r""" | |
labels (:obj:`tf.Tensor` of shape :obj:`(batch_size, sequence_length)`, `optional`): | |
Labels for computing the cross entropy classification loss. Indices should be in ``[0, ..., | |
config.vocab_size - 1]``. | |
""" | |
inputs = input_processing( | |
func=self.call, | |
config=self.config, | |
input_ids=input_ids, | |
attention_mask=attention_mask, | |
token_type_ids=token_type_ids, | |
position_ids=position_ids, | |
head_mask=head_mask, | |
inputs_embeds=inputs_embeds, | |
output_attentions=output_attentions, | |
output_hidden_states=output_hidden_states, | |
return_dict=return_dict, | |
labels=labels, | |
training=training, | |
kwargs_call=kwargs, | |
) | |
transformer_outputs = self.transformer( | |
input_ids=inputs["input_ids"], | |
attention_mask=inputs["attention_mask"], | |
token_type_ids=inputs["token_type_ids"], | |
position_ids=inputs["position_ids"], | |
head_mask=inputs["head_mask"], | |
inputs_embeds=inputs["inputs_embeds"], | |
output_attentions=inputs["output_attentions"], | |
output_hidden_states=inputs["output_hidden_states"], | |
return_dict=inputs["return_dict"], | |
training=inputs["training"], | |
) | |
hidden_states = transformer_outputs[0] | |
logits = self.transformer.tokens_embed(hidden_states, mode="linear") | |
loss = None | |
if inputs["labels"] is not None: | |
# shift labels to the left and cut last logit token | |
logits = logits[:, :-1] | |
labels = inputs["labels"][:, 1:] | |
loss = self.compute_loss(labels, logits) | |
if not inputs["return_dict"]: | |
output = (logits,) + transformer_outputs[1:] | |
return ((loss,) + output) if loss is not None else output | |
return TFCausalLMOutput( | |
loss=loss, | |
logits=logits, | |
hidden_states=transformer_outputs.hidden_states, | |
attentions=transformer_outputs.attentions, | |
) | |
# Copied from transformers.models.bert.modeling_tf_bert.TFBertLMHeadModel.serving_output | |
def serving_output(self, output: TFCausalLMOutput) -> TFCausalLMOutput: | |
hs = tf.convert_to_tensor(output.hidden_states) if self.config.output_hidden_states else None | |
attns = tf.convert_to_tensor(output.attentions) if self.config.output_attentions else None | |
return TFCausalLMOutput(logits=output.logits, hidden_states=hs, attentions=attns) | |
class TFOpenAIGPTDoubleHeadsModel(TFOpenAIGPTPreTrainedModel): | |
def __init__(self, config, *inputs, **kwargs): | |
super().__init__(config, *inputs, **kwargs) | |
config.num_labels = 1 | |
self.transformer = TFOpenAIGPTMainLayer(config, name="transformer") | |
self.multiple_choice_head = TFSequenceSummary( | |
config, initializer_range=config.initializer_range, name="multiple_choice_head" | |
) | |
def call( | |
self, | |
input_ids=None, | |
attention_mask=None, | |
token_type_ids=None, | |
position_ids=None, | |
head_mask=None, | |
inputs_embeds=None, | |
mc_token_ids=None, | |
output_attentions=None, | |
output_hidden_states=None, | |
return_dict=None, | |
training=False, | |
**kwargs, | |
): | |
r""" | |
mc_token_ids (:obj:`tf.Tensor` or :obj:`Numpy array` of shape :obj:`(batch_size, num_choices)`, `optional`, default to index of the last token of the input): | |
Index of the classification token in each input sequence. Selected in the range ``[0, input_ids.size(-1) - | |
1]``. | |
Return: | |
Examples:: | |
>>> import tensorflow as tf | |
>>> from transformers import OpenAIGPTTokenizer, TFOpenAIGPTDoubleHeadsModel | |
>>> tokenizer = OpenAIGPTTokenizer.from_pretrained('openai-gpt') | |
>>> model = TFOpenAIGPTDoubleHeadsModel.from_pretrained('openai-gpt') | |
>>> # Add a [CLS] to the vocabulary (we should train it also!) | |
>>> tokenizer.add_special_tokens({'cls_token': '[CLS]'}) | |
>>> model.resize_token_embeddings(len(tokenizer)) # Update the model embeddings with the new vocabulary size | |
>>> print(tokenizer.cls_token_id, len(tokenizer)) # The newly token the last token of the vocabulary | |
>>> choices = ["Hello, my dog is cute [CLS]", "Hello, my cat is cute [CLS]"] | |
>>> encoding = tokenizer(choices, return_tensors="tf") | |
>>> inputs = {k: tf.expand_dims(v, 0) for k, v in encoding.items()} | |
>>> inputs["mc_token_ids"]= tf.constant([inputs["input_ids"].shape[-1] - 1, inputs["input_ids"].shape[-1] - 1])[None, :] # Batch size 1 | |
>>> outputs = model(inputs) | |
>>> lm_prediction_scores, mc_prediction_scores = outputs[:2] | |
""" | |
inputs = input_processing( | |
func=self.call, | |
config=self.config, | |
input_ids=input_ids, | |
attention_mask=attention_mask, | |
token_type_ids=token_type_ids, | |
position_ids=position_ids, | |
head_mask=head_mask, | |
inputs_embeds=inputs_embeds, | |
mc_token_ids=mc_token_ids, | |
output_attentions=output_attentions, | |
output_hidden_states=output_hidden_states, | |
return_dict=return_dict, | |
training=training, | |
kwargs_call=kwargs, | |
) | |
if inputs["input_ids"] is not None: | |
input_shapes = shape_list(inputs["input_ids"]) | |
else: | |
input_shapes = shape_list(inputs["inputs_embeds"])[:-1] | |
seq_length = input_shapes[-1] | |
flat_input_ids = tf.reshape(inputs["input_ids"], (-1, seq_length)) if inputs["input_ids"] is not None else None | |
flat_attention_mask = ( | |
tf.reshape(inputs["attention_mask"], (-1, seq_length)) if inputs["attention_mask"] is not None else None | |
) | |
flat_token_type_ids = ( | |
tf.reshape(inputs["token_type_ids"], (-1, seq_length)) if inputs["token_type_ids"] is not None else None | |
) | |
flat_position_ids = ( | |
tf.reshape(inputs["position_ids"], (-1, seq_length)) if inputs["position_ids"] is not None else None | |
) | |
transformer_outputs = self.transformer( | |
flat_input_ids, | |
flat_attention_mask, | |
flat_token_type_ids, | |
flat_position_ids, | |
inputs["head_mask"], | |
inputs["inputs_embeds"], | |
inputs["output_attentions"], | |
inputs["output_hidden_states"], | |
return_dict=inputs["return_dict"], | |
training=inputs["training"], | |
) | |
hidden_states = transformer_outputs[0] | |
hidden_states = tf.reshape(hidden_states, input_shapes + shape_list(hidden_states)[-1:]) | |
lm_logits = self.transformer.tokens_embed(hidden_states, mode="linear") | |
mc_logits = self.multiple_choice_head(hidden_states, inputs["mc_token_ids"], training=inputs["training"]) | |
mc_logits = tf.squeeze(mc_logits, axis=-1) | |
if not inputs["return_dict"]: | |
return (lm_logits, mc_logits) + transformer_outputs[1:] | |
return TFOpenAIGPTDoubleHeadsModelOutput( | |
logits=lm_logits, | |
mc_logits=mc_logits, | |
hidden_states=transformer_outputs.hidden_states, | |
attentions=transformer_outputs.attentions, | |
) | |
def serving(self, inputs): | |
output = self.call(inputs) | |
return self.serving_output(output) | |
def serving_output(self, output): | |
hs = tf.convert_to_tensor(output.hidden_states) if self.config.output_hidden_states else None | |
attns = tf.convert_to_tensor(output.attentions) if self.config.output_attentions else None | |
return TFOpenAIGPTDoubleHeadsModelOutput( | |
logits=output.logits, mc_logits=output.mc_logits, hidden_states=hs, attentions=attns | |
) | |
class TFOpenAIGPTForSequenceClassification(TFOpenAIGPTPreTrainedModel, TFSequenceClassificationLoss): | |
def __init__(self, config, *inputs, **kwargs): | |
super().__init__(config, *inputs, **kwargs) | |
self.num_labels = config.num_labels | |
self.score = tf.keras.layers.Dense( | |
config.num_labels, | |
kernel_initializer=get_initializer(config.initializer_range), | |
name="score", | |
use_bias=False, | |
) | |
self.transformer = TFOpenAIGPTMainLayer(config, name="transformer") | |
def call( | |
self, | |
input_ids=None, | |
attention_mask=None, | |
token_type_ids=None, | |
position_ids=None, | |
head_mask=None, | |
inputs_embeds=None, | |
output_attentions=None, | |
output_hidden_states=None, | |
return_dict=None, | |
labels=None, | |
training=False, | |
**kwargs, | |
): | |
r""" | |
labels (:obj:`tf.Tensor` of shape :obj:`(batch_size, sequence_length)`, `optional`): | |
Labels for computing the cross entropy classification loss. Indices should be in ``[0, ..., | |
config.vocab_size - 1]``. | |
""" | |
inputs = input_processing( | |
func=self.call, | |
config=self.config, | |
input_ids=input_ids, | |
attention_mask=attention_mask, | |
token_type_ids=token_type_ids, | |
position_ids=position_ids, | |
head_mask=head_mask, | |
inputs_embeds=inputs_embeds, | |
output_attentions=output_attentions, | |
output_hidden_states=output_hidden_states, | |
return_dict=return_dict, | |
labels=labels, | |
training=training, | |
kwargs_call=kwargs, | |
) | |
transformer_outputs = self.transformer( | |
input_ids=inputs["input_ids"], | |
attention_mask=inputs["attention_mask"], | |
token_type_ids=inputs["token_type_ids"], | |
position_ids=inputs["position_ids"], | |
head_mask=inputs["head_mask"], | |
inputs_embeds=inputs["inputs_embeds"], | |
output_attentions=inputs["output_attentions"], | |
output_hidden_states=inputs["output_hidden_states"], | |
return_dict=inputs["return_dict"], | |
training=inputs["training"], | |
) | |
hidden_states = transformer_outputs[0] | |
logits = self.score(hidden_states) | |
in_logits = None | |
if self.config.pad_token_id is None: | |
sequence_lengths = -1 | |
else: | |
if inputs["input_ids"] is not None: | |
sequence_lengths = ( | |
tf.reduce_sum( | |
tf.cast( | |
tf.math.not_equal(inputs["input_ids"], self.config.pad_token_id), | |
dtype=inputs["input_ids"].dtype, | |
), | |
-1, | |
keepdims=False, | |
) | |
- 1 | |
) | |
in_logits = tf.gather(logits, sequence_lengths, batch_dims=1, axis=1) | |
else: | |
sequence_lengths = -1 | |
logger.warning( | |
f"{self.__class__.__name__} will not detect padding tokens in `inputs_embeds`. Results may be " | |
f"unexpected if using padding tokens in conjunction with `inputs_embeds.`" | |
) | |
loss = None | |
if inputs["labels"] is not None: | |
if input_ids is not None: | |
batch_size, sequence_length = shape_list(inputs["input_ids"])[:2] | |
else: | |
batch_size, sequence_length = shape_list(inputs["inputs_embeds"])[:2] | |
assert ( | |
self.config.pad_token_id is not None or batch_size == 1 | |
), "Cannot handle batch sizes > 1 if no padding token is defined." | |
if not tf.is_tensor(sequence_lengths): | |
in_logits = logits[0:batch_size, sequence_lengths] | |
loss = self.compute_loss( | |
tf.reshape(inputs["labels"], [-1, 1]), tf.reshape(in_logits, [-1, self.num_labels]) | |
) | |
pooled_logits = in_logits if in_logits is not None else logits | |
if not inputs["return_dict"]: | |
output = (pooled_logits,) + transformer_outputs[1:] | |
return ((loss,) + output) if loss is not None else output | |
return TFSequenceClassifierOutput( | |
loss=loss, | |
logits=pooled_logits, | |
hidden_states=transformer_outputs.hidden_states, | |
attentions=transformer_outputs.attentions, | |
) | |
# Copied from transformers.models.bert.modeling_tf_bert.TFBertForSequenceClassification.serving_output | |
def serving_output(self, output: TFSequenceClassifierOutput) -> TFSequenceClassifierOutput: | |
hs = tf.convert_to_tensor(output.hidden_states) if self.config.output_hidden_states else None | |
attns = tf.convert_to_tensor(output.attentions) if self.config.output_attentions else None | |
return TFSequenceClassifierOutput(logits=output.logits, hidden_states=hs, attentions=attns) | |