|
|
|
from torch import nn |
|
|
|
from transformers import PretrainedConfig, PreTrainedModel, AutoConfig, AutoModel |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AutoEncoderConfig(PretrainedConfig): |
|
model_type = "autoencoder" |
|
|
|
def __init__( |
|
self, |
|
input_dim=None, |
|
latent_dim=None, |
|
layer_types=None, |
|
dropout_rate=None, |
|
num_layers=None, |
|
compression_rate=None, |
|
bidirectional=False, |
|
**kwargs |
|
): |
|
super().__init__(**kwargs) |
|
self.input_dim = input_dim |
|
self.latent_dim = latent_dim |
|
self.layer_types = layer_types |
|
self.dropout_rate = dropout_rate |
|
self.num_layers = num_layers |
|
self.compression_rate = compression_rate |
|
self.bidirectional = bidirectional |
|
|
|
def create_layers(model_section, layer_types, input_dim, latent_dim, num_layers, dropout_rate, compression_rate, bidirectional): |
|
|
|
layers = [] |
|
current_dim = input_dim |
|
|
|
input_diamensions = [] |
|
output_diamensions = [] |
|
|
|
for _ in range(num_layers): |
|
input_diamensions.append(current_dim) |
|
next_dim = max(int(current_dim * compression_rate), latent_dim) |
|
current_dim = next_dim |
|
output_diamensions.append(current_dim) |
|
|
|
output_diamensions[num_layers - 1] = latent_dim |
|
|
|
if model_section == "decoder": |
|
input_diamensions, output_diamensions = output_diamensions, input_diamensions |
|
input_diamensions.reverse() |
|
output_diamensions.reverse() |
|
|
|
if bidirectional & (layer_types in ['lstm', 'rnn', 'gru']): |
|
output_diamensions = [2*value for value in output_diamensions] |
|
|
|
for idx, (input_dim, output_dim) in enumerate(zip(input_diamensions, output_diamensions)): |
|
if layer_types == 'linear': |
|
layers.append(nn.Linear(input_dim, output_dim)) |
|
elif layer_types == 'lstm': |
|
layers.append(nn.LSTM(input_dim, output_dim // (2 if bidirectional else 1), batch_first=True, bidirectional=bidirectional)) |
|
elif layer_types == 'rnn': |
|
layers.append(nn.RNN(input_dim, output_dim // (2 if bidirectional else 1), batch_first=True, bidirectional=bidirectional)) |
|
elif layer_types == 'gru': |
|
layers.append(nn.GRU(input_dim, output_dim // (2 if bidirectional else 1), batch_first=True, bidirectional=bidirectional)) |
|
if (idx != num_layers - 1) & (dropout_rate != None): |
|
layers.append(nn.Dropout(dropout_rate)) |
|
return nn.Sequential(*layers) |
|
|
|
class AutoEncoder(PreTrainedModel): |
|
config_class = AutoEncoderConfig |
|
|
|
def __init__(self, config): |
|
super(AutoEncoder, self).__init__(config) |
|
|
|
self.encoder = create_layers("encoder", |
|
config.layer_types, config.input_dim, config.latent_dim, |
|
config.num_layers, config.dropout_rate, config.compression_rate, |
|
config.bidirectional, |
|
) |
|
|
|
self.decoder = create_layers("decoder", |
|
config.layer_types, config.input_dim, config.latent_dim, |
|
config.num_layers, config.dropout_rate, config.compression_rate, |
|
config.bidirectional, |
|
) |
|
|
|
def forward(self, x): |
|
if self.config.layer_types in ['lstm', 'rnn', 'gru']: |
|
for layer in self.encoder: |
|
print(layer) |
|
if isinstance(layer, nn.LSTM): |
|
x, (h_n, c_n)= layer(x) |
|
|
|
elif isinstance(layer, nn.RNN): |
|
x, h_o = layer(x) |
|
elif isinstance(layer, nn.GRU): |
|
x, h_o = layer(x) |
|
else: |
|
x = layer(x) |
|
|
|
for layer in self.decoder: |
|
if isinstance(layer, nn.LSTM): |
|
x, (h_n, c_n) = layer(x) |
|
elif isinstance(layer, nn.RNN): |
|
x, h_o = layer(x) |
|
elif isinstance(layer, nn.GRU): |
|
x, h_o = layer(x) |
|
else: |
|
x = layer(x) |
|
|
|
else: |
|
x = self.encoder(x) |
|
x = self.decoder(x) |
|
return x |