import math import torch import torch.nn as nn from einops import rearrange, repeat class ImageProjModel(nn.Module): """Projection Model""" def __init__( self, cross_attention_dim=1024, clip_embeddings_dim=1024, clip_extra_context_tokens=4, ): super().__init__() self.cross_attention_dim = cross_attention_dim self.clip_extra_context_tokens = clip_extra_context_tokens self.proj = nn.Linear( clip_embeddings_dim, self.clip_extra_context_tokens * cross_attention_dim ) self.norm = nn.LayerNorm(cross_attention_dim) def forward(self, image_embeds): # embeds = image_embeds embeds = image_embeds.type(list(self.proj.parameters())[0].dtype) clip_extra_context_tokens = self.proj(embeds).reshape( -1, self.clip_extra_context_tokens, self.cross_attention_dim ) clip_extra_context_tokens = self.norm(clip_extra_context_tokens) return clip_extra_context_tokens # FFN def FeedForward(dim, mult=4): inner_dim = int(dim * mult) return nn.Sequential( nn.LayerNorm(dim), nn.Linear(dim, inner_dim, bias=False), nn.GELU(), nn.Linear(inner_dim, dim, bias=False), ) def reshape_tensor(x, heads): bs, length, width = x.shape # (bs, length, width) --> (bs, length, n_heads, dim_per_head) x = x.view(bs, length, heads, -1) # (bs, length, n_heads, dim_per_head) --> (bs, n_heads, length, dim_per_head) x = x.transpose(1, 2) # (bs, n_heads, length, dim_per_head) --> (bs*n_heads, length, dim_per_head) x = x.reshape(bs, heads, length, -1) return x class PerceiverAttention(nn.Module): def __init__(self, *, dim, dim_head=64, heads=8): super().__init__() self.scale = dim_head**-0.5 self.dim_head = dim_head self.heads = heads inner_dim = dim_head * heads self.norm1 = nn.LayerNorm(dim) self.norm2 = nn.LayerNorm(dim) self.to_q = nn.Linear(dim, inner_dim, bias=False) self.to_kv = nn.Linear(dim, inner_dim * 2, bias=False) self.to_out = nn.Linear(inner_dim, dim, bias=False) def forward(self, x, latents): """ Args: x (torch.Tensor): image features shape (b, n1, D) latent (torch.Tensor): latent features shape (b, n2, D) """ x = self.norm1(x) latents = self.norm2(latents) b, l, _ = latents.shape q = self.to_q(latents) kv_input = torch.cat((x, latents), dim=-2) k, v = self.to_kv(kv_input).chunk(2, dim=-1) q = reshape_tensor(q, self.heads) k = reshape_tensor(k, self.heads) v = reshape_tensor(v, self.heads) # attention scale = 1 / math.sqrt(math.sqrt(self.dim_head)) # More stable with f16 than dividing afterwards weight = (q * scale) @ (k * scale).transpose(-2, -1) weight = torch.softmax(weight.float(), dim=-1).type(weight.dtype) out = weight @ v out = out.permute(0, 2, 1, 3).reshape(b, l, -1) return self.to_out(out) class Resampler(nn.Module): def __init__( self, dim=1024, depth=8, dim_head=64, heads=16, num_queries=8, embedding_dim=768, output_dim=1024, ff_mult=4, video_length=None, ): super().__init__() self.num_queries = num_queries self.video_length = video_length if video_length is not None: num_queries = num_queries * video_length self.latents = nn.Parameter(torch.randn(1, num_queries, dim) / dim**0.5) self.proj_in = nn.Linear(embedding_dim, dim) self.proj_out = nn.Linear(dim, output_dim) self.norm_out = nn.LayerNorm(output_dim) self.layers = nn.ModuleList([]) for _ in range(depth): self.layers.append( nn.ModuleList( [ PerceiverAttention(dim=dim, dim_head=dim_head, heads=heads), FeedForward(dim=dim, mult=ff_mult), ] ) ) def forward(self, x): latents = self.latents.repeat(x.size(0), 1, 1) # B (T L) C x = self.proj_in(x) for attn, ff in self.layers: latents = attn(x, latents) + latents latents = ff(latents) + latents latents = self.proj_out(latents) latents = self.norm_out(latents) # B L C or B (T L) C return latents class CameraPoseQueryTransformer(nn.Module): def __init__( self, dim=1024, depth=8, dim_head=64, heads=16, num_queries=8, embedding_dim=768, output_dim=1024, ff_mult=4, num_views=None, use_multi_view_attention=True, ): super().__init__() self.num_queries = num_queries self.num_views = num_views assert num_views is not None, "video_length must be given." self.use_multi_view_attention = use_multi_view_attention self.camera_pose_embedding_layers = nn.Sequential( nn.Linear(12, dim), nn.SiLU(), nn.Linear(dim, dim), nn.SiLU(), nn.Linear(dim, dim), ) nn.init.zeros_(self.camera_pose_embedding_layers[-1].weight) nn.init.zeros_(self.camera_pose_embedding_layers[-1].bias) self.latents = nn.Parameter( torch.randn(1, num_views * num_queries, dim) / dim**0.5 ) self.proj_in = nn.Linear(embedding_dim, dim) self.proj_out = nn.Linear(dim, output_dim) self.norm_out = nn.LayerNorm(output_dim) self.layers = nn.ModuleList([]) for _ in range(depth): self.layers.append( nn.ModuleList( [ PerceiverAttention(dim=dim, dim_head=dim_head, heads=heads), FeedForward(dim=dim, mult=ff_mult), ] ) ) def forward(self, x, camera_poses): # camera_poses: (b, t, 12) batch_size, num_views, _ = camera_poses.shape # latents: (1, t*q, d) -> (b, t*q, d) latents = self.latents.repeat(batch_size, 1, 1) x = self.proj_in(x) # camera_poses: (b*t, 12) camera_poses = rearrange(camera_poses, "b t d -> (b t) d", t=num_views) camera_poses = self.camera_pose_embedding_layers( camera_poses ) # camera_poses: (b*t, d) # camera_poses: (b, t, d) camera_poses = rearrange(camera_poses, "(b t) d -> b t d", t=num_views) # camera_poses: (b, t*q, d) camera_poses = repeat(camera_poses, "b t d -> b (t q) d", q=self.num_queries) latents = latents + camera_poses # b, t*q, d latents = rearrange( latents, "b (t q) d -> (b t) q d", b=batch_size, t=num_views, q=self.num_queries, ) # (b*t, q, d) _, x_seq_size, _ = x.shape for layer_idx, (attn, ff) in enumerate(self.layers): if self.use_multi_view_attention and layer_idx % 2 == 1: # latents: (b*t, q, d) latents = rearrange( latents, "(b t) q d -> b (t q) d", b=batch_size, t=num_views, q=self.num_queries, ) # x: (b*t, s, d) x = rearrange( x, "(b t) s d -> b (t s) d", b=batch_size, t=num_views, s=x_seq_size ) # print("After rearrange: latents.shape=", latents.shape) # print("After rearrange: x.shape=", camera_poses.shape) latents = attn(x, latents) + latents latents = ff(latents) + latents if self.use_multi_view_attention and layer_idx % 2 == 1: # latents: (b*q, t, d) latents = rearrange( latents, "b (t q) d -> (b t) q d", b=batch_size, t=num_views, q=self.num_queries, ) # x: (b*s, t, d) x = rearrange( x, "b (t s) d -> (b t) s d", b=batch_size, t=num_views, s=x_seq_size ) latents = self.proj_out(latents) latents = self.norm_out(latents) # B L C or B (T L) C return latents