OSUM / wenet /squeezeformer /subsampling.py
tomxxie
适配zeroGPU
568e264
raw
history blame
11.6 kB
# Copyright (c) 2022 Ximalaya Inc. (authors: Yuguang Yang)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Modified from Squeezeformer(https://github.com/kssteven418/Squeezeformer)
# Squeezeformer(https://github.com/upskyy/Squeezeformer)
# NeMo(https://github.com/NVIDIA/NeMo)
"""DepthwiseConv2dSubsampling4 and TimeReductionLayer definition."""
import torch
import torch.nn as nn
import torch.nn.functional as F
from wenet.transformer.subsampling import BaseSubsampling
from typing import Tuple
from wenet.squeezeformer.conv2d import Conv2dValid
class DepthwiseConv2dSubsampling4(BaseSubsampling):
"""Depthwise Convolutional 2D subsampling (to 1/4 length).
Args:
idim (int): Input dimension.
odim (int): Output dimension.
pos_enc_class (nn.Module): position encoding class.
dw_stride (int): Whether do depthwise convolution.
input_size (int): filter bank dimension.
"""
def __init__(self,
idim: int,
odim: int,
pos_enc_class: torch.nn.Module,
dw_stride: bool = False,
input_size: int = 80,
input_dropout_rate: float = 0.1,
init_weights: bool = True):
super(DepthwiseConv2dSubsampling4, self).__init__()
self.idim = idim
self.odim = odim
self.pw_conv = nn.Conv2d(in_channels=idim,
out_channels=odim,
kernel_size=3,
stride=2)
self.act1 = nn.ReLU()
self.dw_conv = nn.Conv2d(in_channels=odim,
out_channels=odim,
kernel_size=3,
stride=2,
groups=odim if dw_stride else 1)
self.act2 = nn.ReLU()
self.pos_enc = pos_enc_class
self.input_proj = nn.Sequential(
nn.Linear(odim * (((input_size - 1) // 2 - 1) // 2), odim),
nn.Dropout(p=input_dropout_rate),
)
if init_weights:
linear_max = (odim * input_size / 4)**-0.5
torch.nn.init.uniform_(self.input_proj.state_dict()['0.weight'],
-linear_max, linear_max)
torch.nn.init.uniform_(self.input_proj.state_dict()['0.bias'],
-linear_max, linear_max)
self.subsampling_rate = 4
# 6 = (3 - 1) * 1 + (3 - 1) * 2
self.right_context = 6
def forward(
self,
x: torch.Tensor,
x_mask: torch.Tensor,
offset: int = 0
) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
x = x.unsqueeze(1) # (b, c=1, t, f)
x = self.pw_conv(x)
x = self.act1(x)
x = self.dw_conv(x)
x = self.act2(x)
b, c, t, f = x.size()
x = x.permute(0, 2, 1, 3)
x = x.contiguous().view(b, t, c * f)
x, pos_emb = self.pos_enc(x, offset)
x = self.input_proj(x)
return x, pos_emb, x_mask[:, :, :-2:2][:, :, :-2:2]
class TimeReductionLayer1D(nn.Module):
"""
Modified NeMo,
Squeezeformer Time Reduction procedure.
Downsamples the audio by `stride` in the time dimension.
Args:
channel (int): input dimension of
MultiheadAttentionMechanism and PositionwiseFeedForward
out_dim (int): Output dimension of the module.
kernel_size (int): Conv kernel size for
depthwise convolution in convolution module
stride (int): Downsampling factor in time dimension.
"""
def __init__(self,
channel: int,
out_dim: int,
kernel_size: int = 5,
stride: int = 2):
super(TimeReductionLayer1D, self).__init__()
self.channel = channel
self.out_dim = out_dim
self.kernel_size = kernel_size
self.stride = stride
self.padding = max(0, self.kernel_size - self.stride)
self.dw_conv = nn.Conv1d(
in_channels=channel,
out_channels=channel,
kernel_size=kernel_size,
stride=stride,
padding=self.padding,
groups=channel,
)
self.pw_conv = nn.Conv1d(
in_channels=channel,
out_channels=out_dim,
kernel_size=1,
stride=1,
padding=0,
groups=1,
)
self.init_weights()
def init_weights(self):
dw_max = self.kernel_size**-0.5
pw_max = self.channel**-0.5
torch.nn.init.uniform_(self.dw_conv.weight, -dw_max, dw_max)
torch.nn.init.uniform_(self.dw_conv.bias, -dw_max, dw_max)
torch.nn.init.uniform_(self.pw_conv.weight, -pw_max, pw_max)
torch.nn.init.uniform_(self.pw_conv.bias, -pw_max, pw_max)
def forward(
self,
xs,
xs_lens: torch.Tensor,
mask: torch.Tensor = torch.ones((0, 0, 0), dtype=torch.bool),
mask_pad: torch.Tensor = torch.ones((0, 0, 0), dtype=torch.bool),
):
xs = xs.transpose(1, 2) # [B, C, T]
xs = xs.masked_fill(mask_pad.eq(0), 0.0)
xs = self.dw_conv(xs)
xs = self.pw_conv(xs)
xs = xs.transpose(1, 2) # [B, T, C]
B, T, D = xs.size()
mask = mask[:, ::self.stride, ::self.stride]
mask_pad = mask_pad[:, :, ::self.stride]
L = mask_pad.size(-1)
# For JIT exporting, we remove F.pad operator.
if L - T < 0:
xs = xs[:, :L - T, :].contiguous()
else:
dummy_pad = torch.zeros(B, L - T, D, device=xs.device)
xs = torch.cat([xs, dummy_pad], dim=1)
xs_lens = torch.div(xs_lens + 1, 2, rounding_mode='trunc')
return xs, xs_lens, mask, mask_pad
class TimeReductionLayer2D(nn.Module):
def __init__(self,
kernel_size: int = 5,
stride: int = 2,
encoder_dim: int = 256):
super(TimeReductionLayer2D, self).__init__()
self.encoder_dim = encoder_dim
self.kernel_size = kernel_size
self.dw_conv = Conv2dValid(in_channels=encoder_dim,
out_channels=encoder_dim,
kernel_size=(kernel_size, 1),
stride=stride,
valid_trigy=True)
self.pw_conv = Conv2dValid(
in_channels=encoder_dim,
out_channels=encoder_dim,
kernel_size=1,
stride=1,
valid_trigx=False,
valid_trigy=False,
)
self.kernel_size = kernel_size
self.stride = stride
self.init_weights()
def init_weights(self):
dw_max = self.kernel_size**-0.5
pw_max = self.encoder_dim**-0.5
torch.nn.init.uniform_(self.dw_conv.weight, -dw_max, dw_max)
torch.nn.init.uniform_(self.dw_conv.bias, -dw_max, dw_max)
torch.nn.init.uniform_(self.pw_conv.weight, -pw_max, pw_max)
torch.nn.init.uniform_(self.pw_conv.bias, -pw_max, pw_max)
def forward(
self,
xs: torch.Tensor,
xs_lens: torch.Tensor,
mask: torch.Tensor = torch.ones((0, 0, 0), dtype=torch.bool),
mask_pad: torch.Tensor = torch.ones((0, 0, 0), dtype=torch.bool),
) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:
xs = xs.masked_fill(mask_pad.transpose(1, 2).eq(0), 0.0)
xs = xs.unsqueeze(2)
padding1 = self.kernel_size - self.stride
xs = F.pad(xs, (0, 0, 0, 0, 0, padding1, 0, 0),
mode='constant',
value=0.)
xs = self.dw_conv(xs.permute(0, 3, 1, 2))
xs = self.pw_conv(xs).permute(0, 3, 2, 1).squeeze(1).contiguous()
tmp_length = xs.size(1)
xs_lens = torch.div(xs_lens + 1, 2, rounding_mode='trunc')
padding2 = max(0, (xs_lens.max() - tmp_length).data.item())
batch_size, hidden = xs.size(0), xs.size(-1)
dummy_pad = torch.zeros(batch_size, padding2, hidden, device=xs.device)
xs = torch.cat([xs, dummy_pad], dim=1)
mask = mask[:, ::2, ::2]
mask_pad = mask_pad[:, :, ::2]
return xs, xs_lens, mask, mask_pad
class TimeReductionLayerStream(nn.Module):
"""
Squeezeformer Time Reduction procedure.
Downsamples the audio by `stride` in the time dimension.
Args:
channel (int): input dimension of
MultiheadAttentionMechanism and PositionwiseFeedForward
out_dim (int): Output dimension of the module.
kernel_size (int): Conv kernel size for
depthwise convolution in convolution module
stride (int): Downsampling factor in time dimension.
"""
def __init__(self,
channel: int,
out_dim: int,
kernel_size: int = 1,
stride: int = 2):
super(TimeReductionLayerStream, self).__init__()
self.channel = channel
self.out_dim = out_dim
self.kernel_size = kernel_size
self.stride = stride
self.dw_conv = nn.Conv1d(
in_channels=channel,
out_channels=channel,
kernel_size=kernel_size,
stride=stride,
padding=0,
groups=channel,
)
self.pw_conv = nn.Conv1d(
in_channels=channel,
out_channels=out_dim,
kernel_size=1,
stride=1,
padding=0,
groups=1,
)
self.init_weights()
def init_weights(self):
dw_max = self.kernel_size**-0.5
pw_max = self.channel**-0.5
torch.nn.init.uniform_(self.dw_conv.weight, -dw_max, dw_max)
torch.nn.init.uniform_(self.dw_conv.bias, -dw_max, dw_max)
torch.nn.init.uniform_(self.pw_conv.weight, -pw_max, pw_max)
torch.nn.init.uniform_(self.pw_conv.bias, -pw_max, pw_max)
def forward(
self,
xs,
xs_lens: torch.Tensor,
mask: torch.Tensor = torch.ones((0, 0, 0), dtype=torch.bool),
mask_pad: torch.Tensor = torch.ones((0, 0, 0), dtype=torch.bool),
):
xs = xs.transpose(1, 2) # [B, C, T]
xs = xs.masked_fill(mask_pad.eq(0), 0.0)
xs = self.dw_conv(xs)
xs = self.pw_conv(xs)
xs = xs.transpose(1, 2) # [B, T, C]
B, T, D = xs.size()
mask = mask[:, ::self.stride, ::self.stride]
mask_pad = mask_pad[:, :, ::self.stride]
L = mask_pad.size(-1)
# For JIT exporting, we remove F.pad operator.
if L - T < 0:
xs = xs[:, :L - T, :].contiguous()
else:
dummy_pad = torch.zeros(B, L - T, D, device=xs.device)
xs = torch.cat([xs, dummy_pad], dim=1)
xs_lens = torch.div(xs_lens + 1, 2, rounding_mode='trunc')
return xs, xs_lens, mask, mask_pad