KingNish's picture
Upload ./RepCodec/repcodec/layers/vq_module.py with huggingface_hub
5873fc1 verified
raw
history blame
5.76 kB
# Copyright (c) ByteDance, Inc. and its affiliates.
# Copyright (c) Chutong Meng
#
# This source code is licensed under the CC BY-NC license found in the
# LICENSE file in the root directory of this source tree.
# Based on AudioDec (https://github.com/facebookresearch/AudioDec)
import torch
import torch.nn as nn
import torch.nn.functional as F
class VectorQuantize(nn.Module):
"""Vector quantization w/ exponential moving averages (EMA)"""
def __init__(
self,
dim: int,
codebook_size: int,
decay=0.8,
commitment=1.,
eps=1e-5,
n_embed=None,
):
super().__init__()
n_embed = self.default(n_embed, codebook_size)
self.dim = dim
self.n_embed = n_embed
self.decay = decay
self.eps = eps
self.commitment = commitment
embed = torch.randn(dim, n_embed)
self.register_buffer('embed', embed)
self.register_buffer('cluster_size', torch.zeros(n_embed))
self.register_buffer('embed_avg', embed.clone())
@property
def codebook(self):
return self.embed.transpose(0, 1)
def exists(self, val):
return val is not None
def default(self, val, d):
return val if self.exists(val) else d
def ema_inplace(self, moving_avg, new, decay):
moving_avg.data.mul_(decay).add_(new, alpha=(1 - decay))
def laplace_smoothing(self, x, n_categories, eps=1e-5):
return (x + eps) / (x.sum() + n_categories * eps)
def forward(self, input):
dtype = input.dtype
flatten = input.reshape(-1, self.dim)
dist = (
flatten.pow(2).sum(1, keepdim=True)
- 2 * flatten @ self.embed
+ self.embed.pow(2).sum(0, keepdim=True)
)
_, embed_ind = (-dist).max(1)
embed_onehot = F.one_hot(embed_ind, self.n_embed).type(dtype)
embed_ind = embed_ind.view(*input.shape[:-1])
quantize = F.embedding(embed_ind, self.embed.transpose(0, 1))
if self.training:
self.ema_inplace(self.cluster_size, embed_onehot.sum(0), self.decay)
embed_sum = flatten.transpose(0, 1) @ embed_onehot
self.ema_inplace(self.embed_avg, embed_sum, self.decay)
cluster_size = self.laplace_smoothing(self.cluster_size, self.n_embed, self.eps) * self.cluster_size.sum()
embed_normalized = self.embed_avg / cluster_size.unsqueeze(0)
self.embed.data.copy_(embed_normalized)
loss = F.mse_loss(quantize.detach(), input) * self.commitment
quantize = input + (quantize - input).detach()
avg_probs = torch.mean(embed_onehot, dim=0)
perplexity = torch.exp(-torch.sum(avg_probs * torch.log(avg_probs + 1e-10)))
return quantize, loss, perplexity
def forward_index(self, input):
dtype = input.dtype
flatten = input.reshape(-1, self.dim)
dist = (
flatten.pow(2).sum(1, keepdim=True)
- 2 * flatten @ self.embed
+ self.embed.pow(2).sum(0, keepdim=True)
)
_, embed_ind = (-dist).max(1)
embed_onehot = F.one_hot(embed_ind, self.n_embed).type(dtype)
embed_ind = embed_ind.view(*input.shape[:-1])
quantize = F.embedding(embed_ind, self.embed.transpose(0, 1))
quantize = input + (quantize - input).detach()
return quantize, embed_ind
class ResidualVQ(nn.Module):
""" Residual VQ following algorithm 1. in https://arxiv.org/pdf/2107.03312.pdf """
def __init__(
self,
*,
num_quantizers,
**kwargs
):
super().__init__()
self.layers = nn.ModuleList([VectorQuantize(**kwargs) for _ in range(num_quantizers)])
def forward(self, x):
quantized_out = 0.
residual = x
all_losses = []
all_perplexities = []
for layer in self.layers:
quantized, loss, perplexity = layer(residual)
# Issue: https://github.com/lucidrains/vector-quantize-pytorch/issues/33
# We found considering only the 1st layer VQ's graident results in better performance
# residual = residual - quantized.detach() # considering all layers' graidents
residual = residual - quantized # considering only the first layer's graident
quantized_out = quantized_out + quantized
all_losses.append(loss)
all_perplexities.append(perplexity)
all_losses, all_perplexities = map(torch.stack, (all_losses, all_perplexities))
return quantized_out, all_losses, all_perplexities
def forward_index(self, x, flatten_idx=False):
quantized_out = 0.
residual = x
all_indices = []
for i, layer in enumerate(self.layers):
quantized, indices = layer.forward_index(residual)
# residual = residual - quantized.detach()
residual = residual - quantized
quantized_out = quantized_out + quantized
if flatten_idx:
indices += (self.codebook_size * i)
all_indices.append(indices)
all_indices = torch.stack(all_indices)
return quantized_out, all_indices.squeeze(1)
def initial(self):
self.codebook = []
for layer in self.layers:
self.codebook.append(layer.codebook)
self.codebook_size = self.codebook[0].size(0)
self.codebook = torch.stack(self.codebook)
self.codebook = self.codebook.reshape(-1, self.codebook.size(-1))
def lookup(self, indices):
quantized_out = F.embedding(indices, self.codebook) # Num x T x C
return torch.sum(quantized_out, dim=0, keepdim=True)