# Copyright (c) OpenMMLab. All rights reserved. from typing import List, Optional import torch from mmengine.model import BaseModel from torch import nn from mmpretrain.registry import MODELS, TOKENIZER from mmpretrain.structures import DataSample @MODELS.register_module() class Blip2Caption(BaseModel): """BLIP2 Caption. Module for BLIP2 Caption task. Args: vision_backbone (dict): The config dict for vision backbone. text_backbone (dict): The config dict for text backbone. multimodal_backbone (dict): The config dict for multimodal backbone. vision_neck (dict): The config dict for vision neck. tokenizer: (Optional[dict]): The config for tokenizer. Defaults to None. prompt (str): Prompt used for training and eval. Defaults to ''. max_txt_len (int): Max text length of input text. num_captions (int): Number of captions to be generated for each image. data_preprocessor (Optional[dict]): The config for preprocessing input data. If None or no specified type, it will use "MultiModalDataPreprocessor" as type. See :class:`MultiModalDataPreprocessor` for more details. Defaults to None. init_cfg (Optional[dict]): the config to control the initialization. Defaults to None. """ _no_split_modules = ['BEiTViT', 'OPTDecoderLayer', 'BertLayer'] def __init__(self, vision_backbone: dict, text_backbone: dict, multimodal_backbone: dict, vision_neck: dict, tokenizer: Optional[dict] = None, prompt: str = '', max_txt_len: int = 20, num_captions: int = 1, data_preprocessor: Optional[dict] = None, init_cfg: Optional[dict] = None) -> None: if data_preprocessor is None: data_preprocessor = {} if isinstance(data_preprocessor, dict): data_preprocessor.setdefault('type', 'MultiModalDataPreprocessor') data_preprocessor = MODELS.build(data_preprocessor) super().__init__( init_cfg=init_cfg, data_preprocessor=data_preprocessor) self.tokenizer = TOKENIZER.build(tokenizer) self.eos_token_id = self.tokenizer( '\n', add_special_tokens=False).input_ids[0] self.vision_backbone = MODELS.build(vision_backbone) self.ln_vision_backbone = nn.LayerNorm(self.vision_backbone.embed_dims) self.vision_neck = MODELS.build(vision_neck) self.text_backbone = MODELS.build(text_backbone) self.multimodal_backbone = MODELS.build(multimodal_backbone) self.multimodal_backbone.cls = None self.multimodal_backbone.bert.embeddings.word_embeddings = None self.multimodal_backbone.bert.embeddings.position_embeddings = None for layer in self.multimodal_backbone.bert.encoder.layer: layer.output = None layer.intermediate = None self.prompt = prompt self.max_txt_len = max_txt_len self.num_captions = num_captions prompt_tokens = self.tokenizer(prompt, return_tensors='pt') self.prompt_length = prompt_tokens.attention_mask.sum(1) self.query_tokens = nn.Parameter( torch.zeros(1, self.multimodal_backbone.bert.config.query_length, self.multimodal_backbone.bert.config.hidden_size)) self.query_tokens.data.normal_( mean=0.0, std=self.multimodal_backbone.bert.config.initializer_range) # freeze the text backbone for _, param in self.text_backbone.named_parameters(): param.requires_grad = False if hasattr(self, 'register_load_state_dict_post_hook'): self.register_load_state_dict_post_hook(self._ignore_llm_keys_hook) def forward( self, images: torch.Tensor, data_samples: Optional[List] = None, mode: str = 'loss', ) -> List[DataSample]: """The unified entry for a forward process in both training and test. The method should accept two modes: "predict" and "loss": - "predict": Forward and return the predictions, which are fully processed to a list of :obj:`DataSample`. - "loss": Forward and return a dict of losses according to the given inputs and data samples. Note that this method doesn't handle neither back propagation nor optimizer updating, which are done in the :meth:`train_step`. Args: images (torch.Tensor): pre_processed img tensor (N, C, ...). data_samples (List[DataSample], optional): mode (str): Return what kind of value. Defaults to 'loss'. Returns: The return type depends on ``mode``. - If ``mode="loss"``, return a dict of tensor. """ if mode == 'loss': return self.loss(images, data_samples) elif mode == 'predict': return self.predict(images, data_samples) else: raise RuntimeError(f'Invalid mode "{mode}".') def predict(self, images: torch.Tensor, data_samples: Optional[list] = None, **kwargs) -> List[DataSample]: """Predict captions from a batch of inputs. Args: images (torch.Tensor): The input tensor with shape (N, C, ...) in general. data_samples (List[DataSample], optional): The annotation data of every samples. Defaults to None. **kwargs: Other keyword arguments accepted by the ``predict`` method of :attr:`head`. Returns: List[DataSample]: Return list of data samples. """ # extract image features from image_embeds = self.ln_vision_backbone(self.vision_backbone(images)[0]) image_atts = torch.ones( image_embeds.size()[:-1], dtype=torch.long, ).to(images.device) # distill image features to query tokens query_tokens = self.query_tokens.expand(image_embeds.size(0), -1, -1) query_outputs = self.multimodal_backbone.bert( query_embeds=query_tokens, encoder_hidden_states=image_embeds, encoder_attention_mask=image_atts, return_dict=True, ) inputs_opt = self.vision_neck([query_outputs.last_hidden_state]) attns_opt = torch.ones( inputs_opt.size()[:-1], dtype=torch.long).to(images.device) prompt = [self.prompt] * image_embeds.size(0) opt_tokens = self.tokenizer( prompt, return_tensors='pt').to(images.device) input_ids = opt_tokens.input_ids attention_mask = torch.cat([attns_opt, opt_tokens.attention_mask], dim=1) query_embeds = inputs_opt outputs = self.text_backbone.generate( input_ids=input_ids, query_embeds=query_embeds, attention_mask=attention_mask, do_sample=False, top_p=0.9, temperature=1., num_beams=5, max_new_tokens=self.max_txt_len, min_length=1, eos_token_id=self.eos_token_id, repetition_penalty=1.0, length_penalty=1.0, num_return_sequences=self.num_captions, ) output_text = self.tokenizer.batch_decode( outputs[:, self.prompt_length:], skip_special_tokens=True) output_text = [text.strip() for text in output_text] out_data_samples = [] if data_samples is None: data_samples = [None for _ in range(len(output_text))] for data_sample, decode_token in zip(data_samples, output_text): if data_sample is None: data_sample = DataSample() data_sample.pred_caption = decode_token out_data_samples.append(data_sample) return out_data_samples @staticmethod def _ignore_llm_keys_hook(module, incompatible_keys): """Avoid warning missing keys of the LLM model.""" import re llm_pattern = '^text_backbone' for key in list(incompatible_keys.missing_keys): if re.match(llm_pattern, key): incompatible_keys.missing_keys.remove(key)