|
|
|
|
|
import torch |
|
import torch.nn.functional as F |
|
|
|
|
|
import causal_conv1d_cuda |
|
|
|
|
|
class CausalConv1dFn(torch.autograd.Function): |
|
@staticmethod |
|
def forward(ctx, x, weight, bias=None, activation=None): |
|
if activation not in [None, "silu", "swish"]: |
|
raise NotImplementedError("activation must be None, silu, or swish") |
|
if x.stride(2) != 1 and x.stride(1) != 1: |
|
x = x.contiguous() |
|
bias = bias.contiguous() if bias is not None else None |
|
ctx.save_for_backward(x, weight, bias) |
|
ctx.activation = activation in ["silu", "swish"] |
|
out = causal_conv1d_cuda.causal_conv1d_fwd(x, weight, bias, ctx.activation) |
|
return out |
|
|
|
@staticmethod |
|
def backward(ctx, dout): |
|
x, weight, bias = ctx.saved_tensors |
|
if dout.stride(2) != 1 and dout.stride(1) != 1: |
|
dout = dout.contiguous() |
|
|
|
|
|
|
|
dx, dweight, dbias = causal_conv1d_cuda.causal_conv1d_bwd( |
|
x, weight, bias, dout, None, ctx.activation |
|
) |
|
return dx, dweight, dbias if bias is not None else None, None |
|
|
|
|
|
def causal_conv1d_fn(x, weight, bias=None, activation=None): |
|
""" |
|
x: (batch, dim, seqlen) |
|
weight: (dim, width) |
|
bias: (dim,) |
|
activation: either None or "silu" or "swish" |
|
|
|
out: (batch, dim, seqlen) |
|
""" |
|
return CausalConv1dFn.apply(x, weight, bias, activation) |
|
|
|
|
|
def causal_conv1d_ref(x, weight, bias=None, activation=None): |
|
""" |
|
x: (batch, dim, seqlen) |
|
weight: (dim, width) |
|
bias: (dim,) |
|
|
|
out: (batch, dim, seqlen) |
|
""" |
|
if activation not in [None, "silu", "swish"]: |
|
raise NotImplementedError("activation must be None, silu, or swish") |
|
dtype_in = x.dtype |
|
x = x.to(weight.dtype) |
|
seqlen = x.shape[-1] |
|
dim, width = weight.shape |
|
out = F.conv1d(x, weight.unsqueeze(1), bias, padding=width - 1, groups=dim) |
|
out = out[..., :seqlen] |
|
return (out if activation is None else F.silu(out)).to(dtype=dtype_in) |
|
|
|
|
|
def causal_conv1d_update(x, conv_state, weight, bias=None, activation=None): |
|
""" |
|
x: (batch, dim) |
|
conv_state: (batch, dim, width) |
|
weight: (dim, width) |
|
bias: (dim,) |
|
|
|
out: (batch, dim) |
|
""" |
|
if activation not in [None, "silu", "swish"]: |
|
raise NotImplementedError("activation must be None, silu, or swish") |
|
activation = activation in ["silu", "swish"] |
|
return causal_conv1d_cuda.causal_conv1d_update(x, conv_state, weight, bias, activation) |
|
|
|
|
|
def causal_conv1d_update_ref(x, conv_state, weight, bias=None, activation=None): |
|
""" |
|
x: (batch, dim) |
|
conv_state: (batch, dim, width) |
|
weight: (dim, width) |
|
bias: (dim,) |
|
|
|
out: (batch, dim) |
|
""" |
|
if activation not in [None, "silu", "swish"]: |
|
raise NotImplementedError("activation must be None, silu, or swish") |
|
dtype_in = x.dtype |
|
batch, dim = x.shape |
|
width = weight.shape[1] |
|
assert conv_state.shape == (batch, dim, width) |
|
assert weight.shape == (dim, width) |
|
conv_state.copy_(torch.roll(conv_state, shifts=-1, dims=-1)) |
|
conv_state[:, :, -1] = x |
|
out = torch.sum(conv_state * weight, dim=-1) |
|
if bias is not None: |
|
out += bias |
|
return (out if activation is None else F.silu(out)).to(dtype=dtype_in) |
|
|