|
""" |
|
This module defines a self-contained StarVector model with support for remote code loading. |
|
""" |
|
|
|
import os |
|
import torch |
|
import torch.nn as nn |
|
from transformers import PreTrainedModel, PretrainedConfig |
|
from typing import Optional, Union, List |
|
from abc import ABC, abstractmethod |
|
|
|
|
|
from .starvector.image_encoder import ImageEncoder |
|
from .starvector.adapter import Adapter |
|
|
|
|
|
|
|
class StarVectorConfig(PretrainedConfig): |
|
model_type = "starvector" |
|
|
|
def __init__( |
|
self, |
|
starcoder_model_name: str = "bigcode/starcoderbase-1b", |
|
image_encoder_type: str = "clip", |
|
adapter_norm: str = "layer_norm", |
|
image_size: int = 224, |
|
max_length: int = 8192, |
|
max_length_train: int = 8192, |
|
use_flash_attn: bool = True, |
|
use_cache: bool = True, |
|
num_attention_heads: int = 16, |
|
num_hidden_layers: int = 24, |
|
vocab_size: int = 49152, |
|
hidden_size: int = 2048, |
|
num_kv_heads: int = 4, |
|
torch_dtype: str = "bfloat16", |
|
**kwargs, |
|
): |
|
|
|
super().__init__(**kwargs) |
|
self.starcoder_model_name = starcoder_model_name |
|
self.image_encoder_type = image_encoder_type |
|
self.adapter_norm = adapter_norm |
|
self.image_size = image_size |
|
self.max_length = max_length |
|
self.max_length_train = max_length_train |
|
self.use_flash_attn = use_flash_attn |
|
self.use_cache = use_cache |
|
self.num_attention_heads = num_attention_heads |
|
self.num_hidden_layers = num_hidden_layers |
|
self.vocab_size = vocab_size |
|
self.hidden_size = hidden_size |
|
self.num_kv_heads = num_kv_heads |
|
self.torch_dtype = torch_dtype |
|
|
|
|
|
|
|
class StarVectorBase(nn.Module, ABC): |
|
def __init__(self, config, **kwargs): |
|
super().__init__() |
|
self.task = kwargs.get('task', 'im2svg') |
|
self.model_precision = kwargs.get('model_precision', config.torch_dtype) |
|
|
|
|
|
self.svg_transformer = self._get_svg_transformer(config, **kwargs) |
|
|
|
if self.use_image_encoder(): |
|
self.image_encoder = ImageEncoder(config, **kwargs) |
|
self.image_projection = self.get_adapter(config, **kwargs).to(dtype=self.model_precision) |
|
else: |
|
self.query_length = 0 |
|
|
|
self.max_length = config.max_length_train - getattr(self, "query_length", 0) - 4 |
|
self.train_image_encoder = kwargs.get('train_image_encoder', False) |
|
self.train_LLM = kwargs.get('train_LLM', False) |
|
|
|
self._freeze_parameters(self.train_image_encoder, self.train_LLM) |
|
|
|
@abstractmethod |
|
def _get_svg_transformer(self, config, **kwargs): |
|
"""Get SVG transformer model - implementation differs between versions""" |
|
pass |
|
|
|
def _freeze_parameters(self, train_image_encoder, train_LLM): |
|
if self.use_image_encoder(): |
|
for _, param in self.image_encoder.named_parameters(): |
|
param.requires_grad = train_image_encoder |
|
for _, param in self.image_projection.named_parameters(): |
|
param.requires_grad = train_image_encoder |
|
for _, param in self.svg_transformer.transformer.named_parameters(): |
|
param.requires_grad = train_LLM |
|
|
|
def use_image_encoder(self): |
|
return self.task == 'im2svg' |
|
|
|
def get_adapter(self, config, **kwargs): |
|
|
|
if config.image_encoder_type == 'clip': |
|
hidden_size = self.image_encoder.num_features |
|
self.query_length = 257 |
|
elif config.image_encoder_type == 'vqgan': |
|
hidden_size = 256 |
|
self.query_length = 196 |
|
else: |
|
hidden_size = 256 |
|
self.query_length = 200 |
|
llm_hidden_size = config.hidden_size |
|
return Adapter(hidden_size, llm_hidden_size, adapter_norm=config.adapter_norm, query_length=self.query_length, dropout_prob=kwargs.get('dropout', 0.1)) |
|
|
|
def forward(self, batch): |
|
|
|
image = batch["image"] |
|
if self.use_image_encoder(): |
|
embedded_image = self.image_encoder(image) |
|
conditioning_embeds = self.image_projection(embedded_image) |
|
|
|
inputs_embeds = self.svg_transformer.transformer.wte( |
|
torch.randint(0, self.svg_transformer.transformer.wte.num_embeddings, (image.size(0), self.max_length)) |
|
) |
|
else: |
|
inputs_embeds = self.svg_transformer.transformer.wte( |
|
torch.randint(0, self.svg_transformer.transformer.wte.num_embeddings, (image.size(0), self.max_length)) |
|
) |
|
return inputs_embeds |
|
|
|
def generate_im2svg(self, batch, **kwargs): |
|
|
|
image = batch["image"] |
|
if self.use_image_encoder(): |
|
embedded_image = self.image_encoder(image) |
|
conditioning_embeds = self.image_projection(embedded_image) |
|
else: |
|
conditioning_embeds = torch.zeros((image.size(0), 10, 1), device=image.device) |
|
generation_output = self.svg_transformer.transformer.generate(inputs_embeds=conditioning_embeds, max_length=kwargs.get('max_length', 30)) |
|
raw_svg = self.svg_transformer.tokenizer.batch_decode(generation_output, skip_special_tokens=True) |
|
return raw_svg |
|
|
|
@abstractmethod |
|
def _get_embeddings(self, input_ids): |
|
"""Get embeddings from input ids - implementation differs between v1 and v2""" |
|
pass |
|
|
|
@abstractmethod |
|
def _get_svg_text(self, svg_list): |
|
"""Get SVG text with appropriate end tokens - implementation differs between v1 and v2""" |
|
pass |
|
|
|
|
|
class StarVectorStarCoder(StarVectorBase): |
|
def __init__(self, config, **kwargs): |
|
super().__init__(config, **kwargs) |
|
|
|
def _get_svg_transformer(self, config, **kwargs): |
|
from starvector.model.llm.starcoder import StarCoderModel |
|
return StarCoderModel(config, **kwargs) |
|
|
|
def _get_embeddings(self, input_ids): |
|
"""V1-specific embedding method""" |
|
|
|
return self.svg_transformer.transformer.transformer.wte(input_ids) |
|
|
|
def _get_svg_text(self, svg_list): |
|
"""V1-specific SVG text preparation""" |
|
return [t + self.svg_transformer.tokenizer.eos_token for t in svg_list] |
|
|
|
|
|
class StarVectorStarCoder2(StarVectorBase): |
|
def __init__(self, config, **kwargs): |
|
super().__init__(config, **kwargs) |
|
|
|
def _get_svg_transformer(self, config, **kwargs): |
|
from starvector.model.llm.starcoder2 import StarCoderModel |
|
return StarCoderModel(config, **kwargs) |
|
|
|
def _get_embeddings(self, input_ids): |
|
"""V2-specific embedding method""" |
|
return self.svg_transformer.transformer.model.embed_tokens(input_ids) |
|
|
|
def _get_svg_text(self, svg_list): |
|
"""V2-specific SVG text preparation""" |
|
return [t + self.svg_transformer.svg_end_token + self.svg_transformer.tokenizer.eos_token for t in svg_list] |
|
|
|
def _get_im2svg_specific_kwargs(self, kwargs): |
|
"""V2-specific generation kwargs""" |
|
return { |
|
'eos_token_id': self.svg_transformer.svg_end_token_id, |
|
} |
|
|
|
def _get_text2svg_specific_kwargs(self, kwargs): |
|
"""V2-specific text2svg generation kwargs""" |
|
return { |
|
'eos_token_id': self.svg_transformer.tokenizer.eos_token_id, |
|
} |
|
|
|
|
|
|
|
class StarVectorForCausalLM(PreTrainedModel): |
|
config_class = StarVectorConfig |
|
_no_split_modules = [] |
|
|
|
def __init__(self, config, **kwargs): |
|
super().__init__(config) |
|
|
|
if "starcoder2" in config.starcoder_model_name.lower(): |
|
self.model = StarVectorStarCoder2(config=config, **kwargs) |
|
else: |
|
self.model = StarVectorStarCoder(config=config, **kwargs) |
|
|
|
def forward(self, batch): |
|
return self.model(batch) |
|
|
|
def generate_im2svg(self, batch, **kwargs): |
|
return self.model.generate_im2svg(batch, **kwargs) |
|
|
|
def generate_im2text(self, batch, **kwargs): |
|
return self.model.generate_im2text(batch, **kwargs) |
|
|
|
def process_images(self, images): |
|
return self.model.image_encoder.process_images(images) |
|
|
|
|
|
|
|
StarVectorConfig.register_for_auto_class() |
|
StarVectorForCausalLM.register_for_auto_class("AutoModelForCausalLM") |