Spaces:
Running
on
Zero
Running
on
Zero
# Copyright (c) ByteDance, Inc. and its affiliates. | |
# Copyright (c) Chutong Meng | |
# | |
# This source code is licensed under the CC BY-NC license found in the | |
# LICENSE file in the root directory of this source tree. | |
# Based on AudioDec (https://github.com/facebookresearch/AudioDec) | |
import torch | |
import torch.nn as nn | |
import torch.nn.functional as F | |
class VectorQuantize(nn.Module): | |
"""Vector quantization w/ exponential moving averages (EMA)""" | |
def __init__( | |
self, | |
dim: int, | |
codebook_size: int, | |
decay=0.8, | |
commitment=1., | |
eps=1e-5, | |
n_embed=None, | |
): | |
super().__init__() | |
n_embed = self.default(n_embed, codebook_size) | |
self.dim = dim | |
self.n_embed = n_embed | |
self.decay = decay | |
self.eps = eps | |
self.commitment = commitment | |
embed = torch.randn(dim, n_embed) | |
self.register_buffer('embed', embed) | |
self.register_buffer('cluster_size', torch.zeros(n_embed)) | |
self.register_buffer('embed_avg', embed.clone()) | |
def codebook(self): | |
return self.embed.transpose(0, 1) | |
def exists(self, val): | |
return val is not None | |
def default(self, val, d): | |
return val if self.exists(val) else d | |
def ema_inplace(self, moving_avg, new, decay): | |
moving_avg.data.mul_(decay).add_(new, alpha=(1 - decay)) | |
def laplace_smoothing(self, x, n_categories, eps=1e-5): | |
return (x + eps) / (x.sum() + n_categories * eps) | |
def forward(self, input): | |
dtype = input.dtype | |
flatten = input.reshape(-1, self.dim) | |
dist = ( | |
flatten.pow(2).sum(1, keepdim=True) | |
- 2 * flatten @ self.embed | |
+ self.embed.pow(2).sum(0, keepdim=True) | |
) | |
_, embed_ind = (-dist).max(1) | |
embed_onehot = F.one_hot(embed_ind, self.n_embed).type(dtype) | |
embed_ind = embed_ind.view(*input.shape[:-1]) | |
quantize = F.embedding(embed_ind, self.embed.transpose(0, 1)) | |
if self.training: | |
self.ema_inplace(self.cluster_size, embed_onehot.sum(0), self.decay) | |
embed_sum = flatten.transpose(0, 1) @ embed_onehot | |
self.ema_inplace(self.embed_avg, embed_sum, self.decay) | |
cluster_size = self.laplace_smoothing(self.cluster_size, self.n_embed, self.eps) * self.cluster_size.sum() | |
embed_normalized = self.embed_avg / cluster_size.unsqueeze(0) | |
self.embed.data.copy_(embed_normalized) | |
loss = F.mse_loss(quantize.detach(), input) * self.commitment | |
quantize = input + (quantize - input).detach() | |
avg_probs = torch.mean(embed_onehot, dim=0) | |
perplexity = torch.exp(-torch.sum(avg_probs * torch.log(avg_probs + 1e-10))) | |
return quantize, loss, perplexity | |
def forward_index(self, input): | |
dtype = input.dtype | |
flatten = input.reshape(-1, self.dim) | |
dist = ( | |
flatten.pow(2).sum(1, keepdim=True) | |
- 2 * flatten @ self.embed | |
+ self.embed.pow(2).sum(0, keepdim=True) | |
) | |
_, embed_ind = (-dist).max(1) | |
embed_onehot = F.one_hot(embed_ind, self.n_embed).type(dtype) | |
embed_ind = embed_ind.view(*input.shape[:-1]) | |
quantize = F.embedding(embed_ind, self.embed.transpose(0, 1)) | |
quantize = input + (quantize - input).detach() | |
return quantize, embed_ind | |
class ResidualVQ(nn.Module): | |
""" Residual VQ following algorithm 1. in https://arxiv.org/pdf/2107.03312.pdf """ | |
def __init__( | |
self, | |
*, | |
num_quantizers, | |
**kwargs | |
): | |
super().__init__() | |
self.layers = nn.ModuleList([VectorQuantize(**kwargs) for _ in range(num_quantizers)]) | |
def forward(self, x): | |
quantized_out = 0. | |
residual = x | |
all_losses = [] | |
all_perplexities = [] | |
for layer in self.layers: | |
quantized, loss, perplexity = layer(residual) | |
# Issue: https://github.com/lucidrains/vector-quantize-pytorch/issues/33 | |
# We found considering only the 1st layer VQ's graident results in better performance | |
# residual = residual - quantized.detach() # considering all layers' graidents | |
residual = residual - quantized # considering only the first layer's graident | |
quantized_out = quantized_out + quantized | |
all_losses.append(loss) | |
all_perplexities.append(perplexity) | |
all_losses, all_perplexities = map(torch.stack, (all_losses, all_perplexities)) | |
return quantized_out, all_losses, all_perplexities | |
def forward_index(self, x, flatten_idx=False): | |
quantized_out = 0. | |
residual = x | |
all_indices = [] | |
for i, layer in enumerate(self.layers): | |
quantized, indices = layer.forward_index(residual) | |
# residual = residual - quantized.detach() | |
residual = residual - quantized | |
quantized_out = quantized_out + quantized | |
if flatten_idx: | |
indices += (self.codebook_size * i) | |
all_indices.append(indices) | |
all_indices = torch.stack(all_indices) | |
return quantized_out, all_indices.squeeze(1) | |
def initial(self): | |
self.codebook = [] | |
for layer in self.layers: | |
self.codebook.append(layer.codebook) | |
self.codebook_size = self.codebook[0].size(0) | |
self.codebook = torch.stack(self.codebook) | |
self.codebook = self.codebook.reshape(-1, self.codebook.size(-1)) | |
def lookup(self, indices): | |
quantized_out = F.embedding(indices, self.codebook) # Num x T x C | |
return torch.sum(quantized_out, dim=0, keepdim=True) | |