Spaces:
Running
Running
from typing import Union | |
import PIL | |
import PIL.Image | |
import torch | |
import torch.nn.functional as F | |
from einops import rearrange | |
from torch import nn | |
from torchvision.transforms.v2 import ( | |
Compose, | |
InterpolationMode, | |
Normalize, | |
Resize, | |
ToDtype, | |
ToImage, | |
) | |
from transformers.utils import is_flash_attn_2_available | |
try: | |
if is_flash_attn_2_available(): | |
from flash_attn.modules.mha import FlashSelfAttention | |
else: | |
FlashSelfAttention = None | |
except ImportError: | |
FlashSelfAttention = None | |
class Attention(nn.Module): | |
def __init__(self, dim, num_heads=16, use_flash_attn=False): | |
super().__init__() | |
assert dim % num_heads == 0, "dim should be divisible by num_heads" | |
self.num_heads = num_heads | |
self.head_dim = dim // num_heads | |
self.qkv = nn.Linear(dim, dim * 3) | |
self.proj = nn.Linear(dim, dim) | |
if use_flash_attn and FlashSelfAttention is not None: | |
self.flash_attn = FlashSelfAttention() | |
else: | |
self.flash_attn = None | |
torch.nn.init.kaiming_normal_( | |
self.qkv.weight, mode="fan_in", nonlinearity="relu" | |
) | |
torch.nn.init.kaiming_normal_( | |
self.proj.weight, mode="fan_in", nonlinearity="relu" | |
) | |
def forward(self, x: torch.Tensor) -> torch.Tensor: | |
if self.flash_attn is not None: | |
qkv = self.qkv(x) | |
qkv = rearrange( | |
qkv, "... (three h d) -> ... three h d", three=3, h=self.num_heads | |
) | |
attn_output = self.flash_attn(qkv) | |
output = rearrange(attn_output, "... h d -> ... (h d)") | |
output = self.proj(output) | |
return output | |
else: | |
B, N, C = x.shape | |
qkv = ( | |
self.qkv(x) | |
.reshape(B, N, 3, self.num_heads, self.head_dim) | |
.permute(2, 0, 3, 1, 4) | |
) | |
q, k, v = qkv.unbind(0) | |
x = F.scaled_dot_product_attention(q, k, v) | |
x = x.transpose(1, 2).reshape(B, N, C) | |
x = self.proj(x) | |
return x | |
class VitBlock(nn.Module): | |
def __init__(self, embed_dim, use_flash_attn=False): | |
super().__init__() | |
self.attn = Attention(embed_dim, use_flash_attn=use_flash_attn) | |
self.mlp = MLP(embed_dim, 4304) | |
self.norm1 = nn.LayerNorm(embed_dim) | |
self.norm2 = nn.LayerNorm(embed_dim) | |
def forward(self, x): | |
x = x + self.attn(self.norm1(x)) | |
x = x + self.mlp(self.norm2(x)) | |
return x | |
class VisionTransformer(nn.Module): | |
def __init__(self, use_flash_attn=False): | |
super().__init__() | |
embed_len = 729 | |
embed_dim = 1152 | |
self.patch_embed = LinearPatchEmbedding() | |
self.pos_embed = nn.Parameter(torch.randn(1, embed_len, embed_dim) * 0.02) | |
self.blocks = nn.Sequential( | |
*[VitBlock(embed_dim, use_flash_attn=use_flash_attn) for _ in range(27)] | |
) | |
self.norm = nn.LayerNorm(embed_dim) | |
def forward(self, x): | |
x = self.patch_embed(x) | |
x = x + self.pos_embed | |
for block in self.blocks: | |
x = block(x) | |
return self.norm(x) | |
class EncoderWrapper(nn.Module): | |
def __init__(self, use_flash_attn=False): | |
super().__init__() | |
self.model = nn.ModuleDict({"visual": VisionTransformer(use_flash_attn)}) | |
def forward(self, x): | |
return self.model["visual"](x) | |
class LinearPatchEmbedding(nn.Module): | |
def __init__(self): | |
super().__init__() | |
self.linear = nn.Linear(588, 1152) | |
def forward(self, x): | |
b, c, hp1, wp2 = x.shape | |
p1, p2 = 14, 14 | |
h, w = hp1 // p1, wp2 // p2 | |
x = x.reshape(b, c, h, p1, w, p2) | |
x = x.permute(0, 2, 4, 1, 3, 5) | |
x = x.reshape(b, h * w, c * p1 * p2) | |
return self.linear(x) | |
class MLP(nn.Module): | |
def __init__( | |
self, | |
in_features: int, | |
hidden_features: int = None, | |
out_features: int = None, | |
) -> None: | |
super().__init__() | |
out_features = out_features or in_features | |
hidden_features = hidden_features or in_features | |
self.fc1 = nn.Linear(in_features, hidden_features) | |
self.act = nn.GELU(approximate="tanh") | |
self.fc2 = nn.Linear(hidden_features, out_features) | |
torch.nn.init.kaiming_normal_( | |
self.fc1.weight, mode="fan_in", nonlinearity="relu" | |
) | |
torch.nn.init.kaiming_normal_( | |
self.fc2.weight, mode="fan_in", nonlinearity="relu" | |
) | |
def forward(self, x: torch.Tensor) -> torch.Tensor: | |
x = self.fc1(x) | |
x = self.act(x) | |
x = self.fc2(x) | |
return x | |
class VisionProjection(nn.Module): | |
def __init__(self): | |
super().__init__() | |
image_embedding_dim = 1152 | |
model_dim = 2048 | |
hidden_dim = model_dim * 4 | |
self.mlp = MLP(image_embedding_dim * 2, hidden_dim, model_dim) | |
def device(self): | |
return self.mlp.fc1.weight.device | |
def forward(self, x): | |
return self.mlp(x) | |
def create_patches(image, patch_size=(378, 378)): | |
assert image.dim() == 3, "Image must be in CHW format" | |
_, height, width = image.shape # Channels, Height, Width | |
patch_height, patch_width = patch_size | |
if height == patch_height and width == patch_width: | |
return [] | |
# Iterate over the image and create patches | |
patches = [] | |
for i in range(0, height, patch_height): | |
row_patches = [] | |
for j in range(0, width, patch_width): | |
patch = image[:, i : i + patch_height, j : j + patch_width] | |
row_patches.append(patch) | |
patches.append(torch.stack(row_patches)) | |
return patches | |
class VisionEncoder(nn.Module): | |
def __init__(self, use_flash_attn=False): | |
super().__init__() | |
self.encoder = EncoderWrapper(use_flash_attn) | |
self.projection = VisionProjection() | |
self.supported_sizes = [(378, 378), (378, 756), (756, 378), (756, 756)] | |
def device(self): | |
return self.projection.mlp.fc1.weight.device | |
def dtype(self): | |
return self.projection.mlp.fc1.weight.dtype | |
def preprocess(self, image: PIL.Image.Image): | |
width, height = image.size | |
max_dim = max(width, height) | |
if max_dim < 512: | |
im_size = (378, 378) | |
else: | |
aspect_ratio = width / height | |
im_size = min( | |
self.supported_sizes, | |
key=lambda size: ( | |
abs((size[1] / size[0]) - aspect_ratio), | |
abs(size[0] - width) + abs(size[1] - height), | |
), | |
) | |
return Compose( | |
[ | |
Resize(size=im_size, interpolation=InterpolationMode.BICUBIC), | |
ToImage(), | |
ToDtype(torch.float16, scale=True), | |
Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]), | |
] | |
)(image) | |
def forward( | |
self, images: Union[PIL.Image.Image, list[PIL.Image.Image], torch.Tensor] | |
) -> torch.Tensor: | |
im_list = None | |
if isinstance(images, torch.Tensor): | |
# Input must have dimensions (B, C, H, W) | |
assert ( | |
len(images.shape) == 4 | |
), "Tensor input must have dimensions (B, C, H, W)" | |
im_list = list(images) | |
elif isinstance(images, PIL.Image.Image): | |
im_list = [images] | |
elif isinstance(images, list): | |
im_list = images | |
else: | |
raise ValueError( | |
"Input must be a PIL image, list of PIL images, or a tensor" | |
) | |
# Preprocess unless the images are already tensors (indicating that | |
# they have already been preprocessed) | |
if not isinstance(im_list[0], torch.Tensor): | |
im_list = [self.preprocess(im.convert("RGB")) for im in im_list] | |
patches = [create_patches(im) for im in im_list] | |
flat_patches = [patch for image_patches in patches for patch in image_patches] | |
# Images may be variable size, and need to be resized to a common size after | |
# creating patches. | |
resized_images = [ | |
F.interpolate(im.unsqueeze(0), size=(378, 378), mode="bilinear") | |
for im in im_list | |
] | |
combined_images = torch.cat([*resized_images, *flat_patches], dim=0) | |
combined_images = combined_images.to(self.device, dtype=self.dtype) | |
combined_features = self.encoder(combined_images) | |
full_img_features = combined_features[: len(im_list)] | |
patch_features = ( | |
combined_features[len(im_list) :].transpose(1, 2).view(-1, 1152, 27, 27) | |
) | |
# Reshape patch features back to their original structure | |
reshaped_patch_features = [] | |
patch_idx = 0 | |
for i, patch_set in enumerate(patches): | |
if len(patch_set) == 0: | |
reshaped_patch_features.append( | |
full_img_features[i].transpose(0, 1).view(1152, 27, 27) | |
) | |
else: | |
sample_features = [] | |
for row_patches in patch_set: | |
row_len = len(row_patches) | |
row_features = patch_features[ | |
patch_idx : patch_idx + row_len | |
] # row_len, T, C | |
row_features = torch.cat( | |
list(row_features), dim=2 | |
) # T, C * row_len | |
patch_idx += row_len | |
sample_features.append(row_features) | |
sample_features = torch.cat(sample_features, dim=1) | |
sample_features = F.adaptive_avg_pool2d( | |
sample_features, output_size=(27, 27) | |
) | |
reshaped_patch_features.append(sample_features) | |
reshaped_patch_features = ( | |
torch.stack(reshaped_patch_features).view(-1, 1152, 729).transpose(1, 2) | |
) | |
final_features = torch.cat([full_img_features, reshaped_patch_features], dim=2) | |
return self.projection(final_features) | |