# For licensing see accompanying LICENSE file. # Copyright (C) 2024 TinyLLaVA. All Rights Reserved. import time import numpy as np import dataclasses from enum import auto, Enum from typing import List, Tuple, Optional, Union import requests from PIL import Image from io import BytesIO import base64 import re import torch import torch.utils.checkpoint from torch import nn from torch.nn import functional as F from transformers.utils import logging from transformers import PreTrainedModel from transformers.modeling_outputs import CausalLMOutputWithPast from transformers.generation.utils import GenerateOutput from transformers import CLIPVisionModel, CLIPImageProcessor, SiglipVisionModel, SiglipImageProcessor from .configuration import TinyLlavaConfig, IGNORE_INDEX, IMAGE_TOKEN_INDEX, DEFAULT_IMAGE_TOKEN from transformers import AutoConfig, AutoModelForCausalLM, PhiForCausalLM logger = logging.get_logger(__name__) # Model Constants IGNORE_INDEX = -100 IMAGE_TOKEN_INDEX = -200 DEFAULT_IMAGE_TOKEN = "" DEFAULT_IMAGE_PATCH_TOKEN = "" DEFAULT_IM_START_TOKEN = "" DEFAULT_IM_END_TOKEN = "" IMAGE_PLACEHOLDER = "" CONTROLLER_HEART_BEAT_EXPIRATION = 30 WORKER_HEART_BEAT_INTERVAL = 15 LOGDIR = "." class SeparatorStyle(Enum): """Different separator style.""" SINGLE = auto() TWO = auto() MPT = auto() PLAIN = auto() LLAMA_2 = auto() TINY_LLAMA = auto() QWEN_2 = auto() @dataclasses.dataclass class Conversation: """A class that keeps all conversation history.""" system: str roles: List[str] messages: List[List[str]] offset: int sep_style: SeparatorStyle = SeparatorStyle.SINGLE sep: str = "###" sep2: str = None version: str = "Unknown" skip_next: bool = False def get_prompt(self): messages = self.messages if len(messages) > 0 and type(messages[0][1]) is tuple: messages = self.messages.copy() init_role, init_msg = messages[0].copy() init_msg = init_msg[0].replace("", "").strip() if 'mmtag' in self.version: messages[0] = (init_role, init_msg) messages.insert(0, (self.roles[0], "")) messages.insert(1, (self.roles[1], "Received.")) else: messages[0] = (init_role, "\n" + init_msg) if self.sep_style == SeparatorStyle.TWO: seps = [self.sep, self.sep2] ret = self.system + seps[0] for i, (role, message) in enumerate(messages): if message: if type(message) is tuple: message, _, _ = message ret += role + ": " + message + seps[i % 2] else: ret += role + ":" else: raise ValueError(f"Invalid style: {self.sep_style}") return ret def append_message(self, role, message): self.messages.append([role, message]) def copy(self): return Conversation( system=self.system, roles=self.roles, messages=[[x, y] for x, y in self.messages], offset=self.offset, sep_style=self.sep_style, sep=self.sep, sep2=self.sep2, version=self.version) conv_phi_v0 = Conversation( system="A chat between a curious user and an artificial intelligence assistant. " "The assistant gives helpful, detailed, and polite answers to the user's questions.", roles=("USER", "ASSISTANT"), version="phi", messages=(), offset=0, sep_style=SeparatorStyle.TWO, sep=" ", sep2="<|endoftext|>", ) def load_image_from_base64(image): return Image.open(BytesIO(base64.b64decode(image))) def expand2square(pil_img, background_color): width, height = pil_img.size if width == height: return pil_img elif width > height: result = Image.new(pil_img.mode, (width, width), background_color) result.paste(pil_img, (0, (width - height) // 2)) return result else: result = Image.new(pil_img.mode, (height, height), background_color) result.paste(pil_img, ((height - width) // 2, 0)) return result def process_images(images, image_processor, model_cfg): image_aspect_ratio = getattr(model_cfg, "image_aspect_ratio", None) new_images = [] if image_aspect_ratio == 'pad': for image in images: image = expand2square(image, tuple(int(x*255) for x in image_processor.image_mean)) image = image_processor.preprocess(image, return_tensors='pt')['pixel_values'][0] new_images.append(image) else: return image_processor(images, return_tensors='pt')['pixel_values'] if all(x.shape == new_images[0].shape for x in new_images): new_images = torch.stack(new_images, dim=0) return new_images def tokenizer_image_token(prompts, tokenizer, image_token_index=IMAGE_TOKEN_INDEX, return_tensors=None): def process_single_prompt(prompt): prompt_chunks = [tokenizer(chunk).input_ids for chunk in prompt.split('')] def insert_separator(X, sep): return [ele for sublist in zip(X, [sep]*len(X)) for ele in sublist][:-1] input_ids = [] offset = 0 if len(prompt_chunks) > 0 and len(prompt_chunks[0]) > 0 and prompt_chunks[0][0] == tokenizer.bos_token_id: offset = 1 input_ids.append(prompt_chunks[0][0]) for x in insert_separator(prompt_chunks, [image_token_index] * (offset + 1)): input_ids.extend(x[offset:]) return input_ids if isinstance(prompts, str): # Handle single prompt return process_single_prompt(prompts) # Handle batch of prompts batch_input_ids = [process_single_prompt(prompt) for prompt in prompts] if return_tensors is not None: if return_tensors == 'pt': max_length = max(len(ids) for ids in batch_input_ids) padded_input_ids = [ ids + [tokenizer.pad_token_id] * (max_length - len(ids)) for ids in batch_input_ids ] return torch.tensor(padded_input_ids, dtype=torch.long) raise ValueError(f'Unsupported tensor type: {return_tensors}') return batch_input_ids def load_image(image_file): if image_file.startswith("http") or image_file.startswith("https"): response = requests.get(image_file) image = Image.open(BytesIO(response.content)).convert("RGB") else: image = Image.open(image_file).convert("RGB") return image ACT_TYPE = { 'relu': nn.ReLU, 'gelu': nn.GELU } class Connector(nn.Module): def __init__(self, config=None): super().__init__() mlp_gelu_match = re.match(r'^mlp(\d+)x_gelu$', config.connector_type) act_type = config.connector_type.split('_')[-1] mlp_depth = int(mlp_gelu_match.group(1)) modules = [nn.Linear(config.vision_hidden_size, config.hidden_size)] for _ in range(1, mlp_depth): modules.append(ACT_TYPE[act_type]()) modules.append(nn.Linear(config.hidden_size, config.hidden_size)) self._connector = nn.Sequential(*modules) def forward(self, x): return self._connector(x) class VisionTower(nn.Module): def __init__(self, cfg, model_name_or_path = 'clip'): super().__init__() if 'clip' in model_name_or_path: self._vision_tower = CLIPVisionModel(cfg) self._image_processor = CLIPImageProcessor.from_pretrained(cfg.model_name_or_path) else: self._vision_tower = SiglipVisionModel(cfg) self._image_processor = SiglipImageProcessor.from_pretrained(cfg.model_name_or_path) self.config = cfg def forward(self, x, **kwargs): image_features = self._vision_tower(x, output_hidden_states=True) image_features = image_features.hidden_states[kwargs.get('vision_feature_layer', -2)] if kwargs.get('vision_feature_select_strategy', 'patch') == 'patch': image_features = image_features[:, 1:] elif kwargs.get('vision_feature_select_strategy', 'patch') == 'cls_patch': image_features = image_features else: raise ValueError(f"Unexpected select feature: {kwargs.get('vision_feature_select_strategy')}") return image_features @property def vision_tower(self): return self._vision_tower @vision_tower.setter def vision_tower(self, vision_tower): self._vision_tower = vision_tower def get_value_from_kwargs(kwargs, name): if name in kwargs: return kwargs.pop(name) else: return None class TinyLlavaPreTrainedModel(PreTrainedModel): config_class = TinyLlavaConfig base_model_prefix = "model" supports_gradient_checkpointing = True _no_split_modules = ["LlavaVisionAttention"] _skip_keys_device_placement = "past_key_values" _supports_flash_attn_2 = True def _init_weights(self, module): std = ( self.config.initializer_range if hasattr(self.config, "initializer_range") else self.config.text_config.initializer_range ) if hasattr(module, "class_embedding"): module.class_embedding.data.normal_(mean=0.0, std=std) if isinstance(module, (nn.Linear, nn.Conv2d)): module.weight.data.normal_(mean=0.0, std=std) if module.bias is not None: module.bias.data.zero_() elif isinstance(module, nn.Embedding): module.weight.data.normal_(mean=0.0, std=std) if module.padding_idx is not None: module.weight.data[module.padding_idx].zero_() @property def _supports_sdpa(self): return self.language_model._supports_sdpa class TinyLlavaForConditionalGeneration(TinyLlavaPreTrainedModel): def __init__(self, config: TinyLlavaConfig): super().__init__(config) self.language_model = PhiForCausalLM(config.text_config) self.vision_tower = VisionTower(config.vision_config, config.vision_model_name_or_path) self.connector = Connector(config) self.post_init() def get_input_embeddings(self): return self.language_model.get_input_embeddings() def set_input_embeddings(self, value): self.language_model.set_input_embeddings(value) def get_output_embeddings(self): return self.language_model.get_output_embeddings() def set_output_embeddings(self, new_embeddings): self.language_model.set_output_embeddings(new_embeddings) def set_decoder(self, decoder): self.language_model.set_decoder(decoder) def get_decoder(self): return self.language_model.get_decoder() def tie_weights(self): return self.language_model.tie_weights() def resize_token_embeddings(self, new_num_tokens: Optional[int] = None, pad_to_multiple_of=None) -> nn.Embedding: model_embeds = self.language_model.resize_token_embeddings(new_num_tokens, pad_to_multiple_of) # update vocab size self.config.text_config.vocab_size = model_embeds.num_embeddings self.config.vocab_size = model_embeds.num_embeddings self.vocab_size = model_embeds.num_embeddings return model_embeds def forward( self, input_ids: torch.LongTensor = None, attention_mask: Optional[torch.Tensor] = None, position_ids: Optional[torch.LongTensor] = None, past_key_values: Optional[List[torch.FloatTensor]] = None, inputs_embeds: Optional[torch.FloatTensor] = None, labels: Optional[torch.LongTensor] = None, use_cache: Optional[bool] = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, images: Optional[torch.FloatTensor] = None, image_sizes: Optional[List[List[int]]] = None, return_dict: Optional[bool] = None, ) -> Union[Tuple, CausalLMOutputWithPast]: use_cache = use_cache if use_cache is not None else self.config.use_cache if inputs_embeds is None: ( input_ids, position_ids, attention_mask, past_key_values, inputs_embeds, labels ) = self.prepare_inputs_labels_for_multimodal( input_ids, position_ids, attention_mask, past_key_values, labels, images, image_sizes ) return self.language_model.forward( input_ids=input_ids, attention_mask=attention_mask, position_ids=position_ids, past_key_values=past_key_values, inputs_embeds=inputs_embeds, labels=labels, use_cache=use_cache, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict ) @torch.no_grad() def generate( self, inputs: Optional[torch.Tensor] = None, images: Optional[torch.Tensor] = None, image_sizes: Optional[torch.Tensor] = None, **kwargs, ) -> Union[GenerateOutput, torch.LongTensor]: position_ids = kwargs.pop("position_ids", None) attention_mask = kwargs.pop("attention_mask", None) if "inputs_embeds" in kwargs: raise NotImplementedError("`inputs_embeds` is not supported") if images is not None: ( inputs, position_ids, attention_mask, _, inputs_embeds, _ ) = self.prepare_inputs_labels_for_multimodal( inputs, position_ids, attention_mask, None, None, images, image_sizes=image_sizes ) else: inputs_embeds = self.language_model.get_input_embeddings()(inputs) return self.language_model.generate( position_ids=position_ids, attention_mask=attention_mask, inputs_embeds=inputs_embeds, **kwargs ) def encode_images(self, images): kwargs = {} kwargs['vision_feature_layer'] = self.config.vision_feature_layer kwargs['vision_feature_select_strategy'] = self.config.vision_feature_select_strategy images = images.to(device=self.device, dtype=self.dtype) image_features = self.vision_tower(images, **kwargs) image_features = self.connector(image_features) return image_features def prepare_inputs_for_generation(self, input_ids, past_key_values=None, inputs_embeds=None, **kwargs): images = kwargs.pop("images", None) image_sizes = kwargs.pop("image_sizes", None) inputs = self.language_model.prepare_inputs_for_generation( input_ids, past_key_values=past_key_values, inputs_embeds=inputs_embeds, **kwargs ) if images is not None: inputs['images'] = images if image_sizes is not None: inputs['image_sizes'] = image_sizes return inputs def prepare_inputs_labels_for_multimodal( self, input_ids, position_ids, attention_mask, past_key_values, labels, images, image_sizes=None ): vision_tower = self.vision_tower if vision_tower is None or images is None or input_ids.shape[1] == 1: return input_ids, position_ids, attention_mask, past_key_values, None, labels image_features = self.encode_images(images) # TODO: image start / end is not implemented here to support pretraining. if getattr(self.config, 'tune_mm_mlp_adapter', False): raise NotImplementedError # Let's just add dummy tensors if they do not exist, # it is a headache to deal with None all the time. # But it is not ideal, and if you have a better idea, # please open an issue / submit a PR, thanks. _labels = labels _position_ids = position_ids _attention_mask = attention_mask if attention_mask is None: attention_mask = torch.ones_like(input_ids, dtype=torch.bool) else: attention_mask = attention_mask.bool() if position_ids is None: position_ids = torch.arange(0, input_ids.shape[1], dtype=torch.long, device=input_ids.device) if labels is None: labels = torch.full_like(input_ids, IGNORE_INDEX) # remove the padding using attention_mask -- FIXME _input_ids = input_ids input_ids = [cur_input_ids[cur_attention_mask] for cur_input_ids, cur_attention_mask in zip(input_ids, attention_mask)] labels = [cur_labels[cur_attention_mask] for cur_labels, cur_attention_mask in zip(labels, attention_mask)] new_input_embeds = [] new_labels = [] cur_image_idx = 0 for batch_idx, cur_input_ids in enumerate(input_ids): num_images = (cur_input_ids == IMAGE_TOKEN_INDEX).sum() if num_images == 0: cur_image_features = image_features[cur_image_idx] cur_input_embeds_1 = self.language_model.get_input_embeddings()(cur_input_ids) cur_input_embeds = torch.cat([cur_input_embeds_1, cur_image_features[0:0]], dim=0) new_input_embeds.append(cur_input_embeds) new_labels.append(labels[batch_idx]) cur_image_idx += 1 continue image_token_indices = [-1] + torch.where(cur_input_ids == IMAGE_TOKEN_INDEX)[0].tolist() + [cur_input_ids.shape[0]] cur_input_ids_noim = [] cur_labels = labels[batch_idx] cur_labels_noim = [] for i in range(len(image_token_indices) - 1): cur_input_ids_noim.append(cur_input_ids[image_token_indices[i]+1:image_token_indices[i+1]]) cur_labels_noim.append(cur_labels[image_token_indices[i]+1:image_token_indices[i+1]]) split_sizes = [x.shape[0] for x in cur_labels_noim] cur_input_embeds = self.language_model.get_input_embeddings()(torch.cat(cur_input_ids_noim)) cur_input_embeds_no_im = torch.split(cur_input_embeds, split_sizes, dim=0) cur_new_input_embeds = [] cur_new_labels = [] for i in range(num_images + 1): cur_new_input_embeds.append(cur_input_embeds_no_im[i]) cur_new_labels.append(cur_labels_noim[i]) if i < num_images: cur_image_features = image_features[cur_image_idx] cur_image_idx += 1 cur_new_input_embeds.append(cur_image_features) cur_new_labels.append(torch.full((cur_image_features.shape[0],), IGNORE_INDEX, device=cur_labels.device, dtype=cur_labels.dtype)) cur_new_input_embeds = [x.to(self.device) for x in cur_new_input_embeds] cur_new_input_embeds = torch.cat(cur_new_input_embeds) cur_new_labels = torch.cat(cur_new_labels) new_input_embeds.append(cur_new_input_embeds) new_labels.append(cur_new_labels) # Truncate sequences to max length as image embeddings can make the sequence longer tokenizer_model_max_length = getattr(self.config, 'tokenizer_model_max_length', None) if tokenizer_model_max_length is not None: new_input_embeds = [x[:tokenizer_model_max_length] for x in new_input_embeds] new_labels = [x[:tokenizer_model_max_length] for x in new_labels] # Combine them max_len = max(x.shape[0] for x in new_input_embeds) batch_size = len(new_input_embeds) new_input_embeds_padded = [] new_labels_padded = torch.full((batch_size, max_len), IGNORE_INDEX, dtype=new_labels[0].dtype, device=new_labels[0].device) attention_mask = torch.zeros((batch_size, max_len), dtype=attention_mask.dtype, device=attention_mask.device) position_ids = torch.zeros((batch_size, max_len), dtype=position_ids.dtype, device=position_ids.device) for i, (cur_new_embed, cur_new_labels) in enumerate(zip(new_input_embeds, new_labels)): cur_len = cur_new_embed.shape[0] if getattr(self.config, 'tokenizer_padding_side', 'right') == "left": new_input_embeds_padded.append(torch.cat(( torch.zeros((max_len - cur_len, cur_new_embed.shape[1]), dtype=cur_new_embed.dtype, device=cur_new_embed.device), cur_new_embed ), dim=0)) if cur_len > 0: new_labels_padded[i, -cur_len:] = cur_new_labels attention_mask[i, -cur_len:] = True position_ids[i, -cur_len:] = torch.arange(0, cur_len, dtype=position_ids.dtype, device=position_ids.device) else: new_input_embeds_padded.append(torch.cat(( cur_new_embed, torch.zeros((max_len - cur_len, cur_new_embed.shape[1]), dtype=cur_new_embed.dtype, device=cur_new_embed.device) ), dim=0)) if cur_len > 0: new_labels_padded[i, :cur_len] = cur_new_labels attention_mask[i, :cur_len] = True position_ids[i, :cur_len] = torch.arange(0, cur_len, dtype=position_ids.dtype, device=position_ids.device) new_input_embeds = torch.stack(new_input_embeds_padded, dim=0) if _labels is None: new_labels = None else: new_labels = new_labels_padded if _attention_mask is None: attention_mask = None else: attention_mask = attention_mask.to(dtype=_attention_mask.dtype) if _position_ids is None: position_ids = None return None, position_ids, attention_mask, past_key_values, new_input_embeds, new_labels def chat( self, prompts: Union[list, str], tokenizer=None, images: Union[list, str] = None, max_new_tokens: int = 512, num_beams=1, top_p=None, temperature=0 ): """ Generate responses for a batch of prompts. Args: prompts (list): List of text prompts. tokenizer: Tokenizer object. images (list, optional): List of image file paths corresponding to the prompts. Defaults to None. max_new_tokens (int): Maximum number of new tokens to generate. Defaults to 512. num_beams (int): Number of beams for beam search. Defaults to 1. top_p (float, optional): Nucleus sampling probability. Defaults to None. temperature (float): Sampling temperature. Defaults to 0. Returns: list: List of generated outputs. list: List of generation times for each batch. """ if isinstance(prompts, list) and isinstance(images, str): assert len(prompts) == len(images) or images is None, "Mismatch between prompts and images." else: prompts = [prompts] images = [images] image_processor = self.vision_tower._image_processor # Prepare inputs input_texts = [] image_tensors = None for i, prompt in enumerate(prompts): if images and images[i] is not None: prompt = DEFAULT_IMAGE_TOKEN + '\n' + prompt conv = conv_phi_v0.copy() conv.append_message(conv.roles[0], prompt) conv.append_message(conv.roles[1], None) input_texts.append(conv.get_prompt()) # Tokenize prompts input_ids = tokenizer_image_token( input_texts, tokenizer, IMAGE_TOKEN_INDEX, return_tensors="pt" ).to(self.device) # Process images if images: processed_images = [ process_images(load_image(image), image_processor, self.config) for image in images if image is not None ] image_tensors = torch.stack(processed_images).to(self.device).squeeze(1) # Generate responses stime = time.time() with torch.inference_mode(): output_ids = self.generate( input_ids, images=image_tensors, do_sample=True if temperature > 0 else False, temperature=temperature, top_p=top_p, num_beams=num_beams, pad_token_id=tokenizer.pad_token_id, max_new_tokens=max_new_tokens, use_cache=True, ) generation_time = time.time() - stime # Decode outputs outputs = tokenizer.batch_decode(output_ids, skip_special_tokens=True) outputs = [output.strip() for output in outputs] return outputs, generation_time AutoConfig.register("tinyllava", TinyLlavaConfig) AutoModelForCausalLM.register(TinyLlavaConfig, TinyLlavaForConditionalGeneration)