|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from collections import defaultdict |
|
from typing import List, Union |
|
|
|
import PIL |
|
import torch |
|
from transformers import BatchFeature |
|
from transformers.image_utils import ImageInput |
|
from transformers.processing_utils import ProcessingKwargs, ProcessorMixin, Unpack |
|
from transformers.tokenization_utils_base import TextInput, PreTokenizedInput |
|
IGNORE_ID = -100 |
|
IMAGE_TOKEN_ID = -200 |
|
IMAGE_TOKEN = "<image>" |
|
IMAGE_ATOM_ID = -300 |
|
IMAGE_INDICATOR_IDS = [-301, -302, -303, -304, -305] |
|
|
|
class OvisProcessorKwargs(ProcessingKwargs, total=False): |
|
_defaults = { |
|
"text_kwargs": { |
|
"padding": False, |
|
}, |
|
"images_kwargs": { |
|
'max_partition':9, |
|
'covering_threshold':0.9, |
|
'convert_to_rgb':True, |
|
'return_tensors':'pt'}, |
|
} |
|
|
|
|
|
class OvisProcessor(ProcessorMixin): |
|
r""" |
|
Constructs a Ovis processor which wraps a Ovis image processor and a Qwen2 tokenizer into a single processor. |
|
[`OvisProcessor`] offers all the functionalities of [`Qwen2VLImageProcessor`] and [`Qwen2TokenizerFast`]. See the |
|
[`~OvisProcessor.__call__`] and [`~OvisProcessor.decode`] for more information. |
|
Args: |
|
image_processor ([`Qwen2VLImageProcessor`], *optional*): |
|
The image processor is a required input. |
|
tokenizer ([`Qwen2TokenizerFast`], *optional*): |
|
The tokenizer is a required input. |
|
chat_template (`str`, *optional*): A Jinja template which will be used to convert lists of messages |
|
in a chat into a tokenizable string. |
|
""" |
|
|
|
attributes = ["image_processor", "tokenizer"] |
|
valid_kwargs = ["chat_template"] |
|
|
|
image_processor_class = "AutoImageProcessor" |
|
tokenizer_class = ("Qwen2Tokenizer", "Qwen2TokenizerFast") |
|
|
|
def __init__(self, image_processor=None, tokenizer=None, chat_template=None, **kwargs): |
|
self.image_token = "<|image_pad|>" if not hasattr(tokenizer, "image_token") else tokenizer.image_token |
|
self.video_token = "<|video_pad|>" if not hasattr(tokenizer, "video_token") else tokenizer.video_token |
|
super().__init__(image_processor, tokenizer, chat_template=chat_template) |
|
|
|
def __call__( |
|
self, |
|
images: ImageInput = None, |
|
text: Union[TextInput, PreTokenizedInput, List[TextInput], List[PreTokenizedInput]] = None, |
|
**kwargs: Unpack[OvisProcessorKwargs], |
|
) -> BatchFeature: |
|
""" |
|
Main method to prepare for the model one or several sequences(s) and image(s). This method forwards the `text` |
|
and `kwargs` arguments to Qwen2TokenizerFast's [`~Qwen2TokenizerFast.__call__`] if `text` is not `None` to encode |
|
the text. To prepare the vision inputs, this method forwards the `vision_infos` and `kwrags` arguments to |
|
Qwen2VLImageProcessor's [`~Qwen2VLImageProcessor.__call__`] if `vision_infos` is not `None`. |
|
|
|
Args: |
|
images (`PIL.Image.Image`, `np.ndarray`, `torch.Tensor`, `List[PIL.Image.Image]`, `List[np.ndarray]`, `List[torch.Tensor]`): |
|
The image or batch of images to be prepared. Each image can be a PIL image, NumPy array or PyTorch |
|
tensor. Both channels-first and channels-last formats are supported. |
|
text (`str`, `List[str]`, `List[List[str]]`): |
|
The sequence or batch of sequences to be encoded. Each sequence can be a string or a list of strings |
|
(pretokenized string). If the sequences are provided as list of strings (pretokenized), you must set |
|
`is_split_into_words=True` (to lift the ambiguity with a batch of sequences). |
|
videos (`np.ndarray`, `torch.Tensor`, `List[np.ndarray]`, `List[torch.Tensor]`): |
|
The image or batch of videos to be prepared. Each video can be a 4D NumPy array or PyTorch |
|
tensor, or a nested list of 3D frames. Both channels-first and channels-last formats are supported. |
|
return_tensors (`str` or [`~utils.TensorType`], *optional*): |
|
If set, will return tensors of a particular framework. Acceptable values are: |
|
- `'tf'`: Return TensorFlow `tf.constant` objects. |
|
- `'pt'`: Return PyTorch `torch.Tensor` objects. |
|
- `'np'`: Return NumPy `np.ndarray` objects. |
|
- `'jax'`: Return JAX `jnp.ndarray` objects. |
|
|
|
Returns: |
|
[`BatchFeature`]: A [`BatchFeature`] with the following fields: |
|
|
|
- **input_ids** -- List of token ids to be fed to a model. Returned when `text` is not `None`. |
|
- **attention_mask** -- List of indices specifying which tokens should be attended to by the model (when |
|
`return_attention_mask=True` or if *"attention_mask"* is in `self.model_input_names` and if `text` is not |
|
`None`). |
|
- **pixel_values** -- Pixel values to be fed to a model. Returned when `images` is not `None`. |
|
- **pixel_values_videos** -- Pixel values of videos to be fed to a model. Returned when `videos` is not `None`. |
|
- **image_grid_thw** -- List of image 3D grid in LLM. Returned when `images` is not `None`. |
|
- **video_grid_thw** -- List of video 3D grid in LLM. Returned when `videos` is not `None`. |
|
- **second_per_grid_ts** -- List of video seconds per time grid. Returned when `videos` is not `None`. |
|
""" |
|
output_kwargs = self._merge_kwargs( |
|
OvisProcessorKwargs, |
|
tokenizer_init_kwargs=self.tokenizer.init_kwargs, |
|
**kwargs, |
|
) |
|
|
|
|
|
image_features = {} |
|
if images is not None: |
|
processed_images = [] |
|
image_placeholders_list = [] |
|
|
|
|
|
for image in images if isinstance(images, list) else [images]: |
|
pixel_values, image_placeholders = self.preprocess_image( |
|
image=image, **output_kwargs["images_kwargs"] |
|
) |
|
processed_images.append(pixel_values) |
|
image_placeholders_list.append(image_placeholders) |
|
|
|
|
|
if processed_images: |
|
image_features["image_placeholders"] = image_placeholders_list |
|
|
|
|
|
if text is not None: |
|
|
|
if not isinstance(text, list): |
|
text = [text] |
|
|
|
all_input_ids = torch.tensor([], dtype=torch.long) |
|
all_attention_mask = torch.tensor([], dtype=torch.long) |
|
|
|
for idx, txt in enumerate(text): |
|
|
|
text_parts = txt.split(IMAGE_TOKEN) |
|
|
|
|
|
full_input_ids= torch.tensor([], dtype=torch.long) |
|
full_attention_mask = torch.tensor([], dtype=torch.long) |
|
|
|
for i, part in enumerate(text_parts): |
|
|
|
text_tokens = self.tokenizer(part, **output_kwargs["text_kwargs"]) |
|
full_input_ids=torch.cat([full_input_ids,torch.tensor(text_tokens.input_ids, dtype=full_input_ids.dtype, device=full_input_ids.device)], dim=-1) |
|
full_attention_mask=torch.cat([full_attention_mask,torch.tensor(text_tokens.attention_mask)], dim=-1) |
|
|
|
|
|
if i < len(text_parts) - 1 and "image_placeholders" in image_features: |
|
if idx < len(image_features["image_placeholders"]): |
|
placeholder_ids = image_features["image_placeholders"][idx] |
|
full_input_ids=torch.cat([full_input_ids,torch.tensor(placeholder_ids).unsqueeze(0)], dim=-1) |
|
full_attention_mask=torch.cat([full_attention_mask,torch.tensor([1] * len(placeholder_ids)).unsqueeze(0)], dim=-1) |
|
last_bigger_tensor_dim = all_input_ids.shape[-1] |
|
if full_input_ids.shape[-1] > last_bigger_tensor_dim > 0: |
|
|
|
all_input_ids = torch.cat([all_input_ids, |
|
torch.full((1, full_input_ids.shape[-1] - last_bigger_tensor_dim), |
|
self.tokenizer.pad_token_id, dtype=torch.long)], dim=-1) |
|
all_attention_mask = torch.cat([all_attention_mask, |
|
torch.zeros((1, full_input_ids.shape[-1] - last_bigger_tensor_dim), |
|
dtype=torch.long)], dim=-1) |
|
last_bigger_tensor_dim = full_input_ids.shape[-1] |
|
all_input_ids = torch.cat([all_input_ids, full_input_ids], dim=0) |
|
all_attention_mask = torch.cat([ all_attention_mask, full_attention_mask], dim=0) |
|
|
|
|
|
output = BatchFeature( |
|
data={ |
|
"input_ids": all_input_ids, |
|
"attention_mask": all_attention_mask, |
|
} |
|
) |
|
|
|
|
|
if image_features: |
|
output["pixel_values"] = processed_images |
|
|
|
return output |
|
|
|
|
|
|
|
return BatchFeature(data=image_features) |
|
|
|
|
|
|
|
def get_image_size(self): |
|
height = self.image_processor.crop_size["height"] |
|
width = self.image_processor.crop_size["width"] |
|
return height, width |
|
|
|
@staticmethod |
|
def construct_image_placeholders(grid): |
|
image_placeholders = [IMAGE_INDICATOR_IDS[0], IMAGE_ATOM_ID, IMAGE_INDICATOR_IDS[1]] |
|
if grid[0] * grid[1] > 1: |
|
for r in range(grid[0]): |
|
for c in range(grid[1]): |
|
image_placeholders.append(IMAGE_ATOM_ID) |
|
if c < grid[1] - 1: |
|
image_placeholders.append(IMAGE_INDICATOR_IDS[2]) |
|
if r < grid[0] - 1: |
|
image_placeholders.append(IMAGE_INDICATOR_IDS[3]) |
|
image_placeholders.append(IMAGE_INDICATOR_IDS[4]) |
|
return image_placeholders |
|
def preprocess_image(self, image: PIL.Image.Image, max_partition, covering_threshold, convert_to_rgb, return_tensors): |
|
def _preprocess(img: PIL.Image.Image, side): |
|
|
|
w, h = img.size |
|
if w == h: |
|
new_width = new_height = side |
|
elif w > h: |
|
new_width = side |
|
new_height = int(h / w * new_width) |
|
else: |
|
new_height = side |
|
new_width = int(w / h * new_height) |
|
new_size = dict(height=new_height, width=new_width) |
|
pixel_values = self.image_processor.preprocess(img, size=new_size, return_tensors=return_tensors)['pixel_values'] |
|
|
|
|
|
square_values = torch.zeros([1, 3, side, side], dtype=pixel_values.dtype, device=pixel_values.device) |
|
new_height, new_width = pixel_values.shape[2:] |
|
if new_height == new_width: |
|
square_values[:, :, :, :] = pixel_values |
|
elif new_height > new_width: |
|
from_index = (side - new_width) // 2 |
|
square_values[:, :, :, from_index:from_index + new_width] = pixel_values |
|
else: |
|
from_index = (side - new_height) // 2 |
|
square_values[:, :, from_index:from_index + new_height, :] = pixel_values |
|
|
|
return square_values |
|
|
|
def _partition(img, grid): |
|
w, h = img.size |
|
row_height = h // grid[0] |
|
col_width = w // grid[1] |
|
|
|
partition = [] |
|
for row in range(grid[0]): |
|
for col in range(grid[1]): |
|
left = col * col_width |
|
upper = row * row_height |
|
right = w if col == grid[1] - 1 else (col + 1) * col_width |
|
lower = h if row == grid[0] - 1 else (row + 1) * row_height |
|
partition.append((left, upper, right, lower)) |
|
|
|
return partition |
|
|
|
def _covering_area(left, upper, right, lower, side): |
|
w = right - left |
|
h = lower - upper |
|
w, h = max(w, h), min(w, h) |
|
if w > side: |
|
h = h / w * side |
|
w = side |
|
return w * h |
|
|
|
def _get_best_grid(img, side): |
|
img_area = img.size[0] * img.size[1] |
|
|
|
candidate_grids = [] |
|
for i in range(1, max_partition + 1): |
|
for j in range(1, max_partition + 1): |
|
if i * j <= max_partition: |
|
candidate_grids.append((i, j)) |
|
|
|
all_grids = [] |
|
good_grids = [] |
|
for grid in candidate_grids: |
|
partition = _partition(img, grid) |
|
covering_ratio = sum([_covering_area(*p, side) for p in partition]) / img_area |
|
assert covering_ratio <= 1.0 |
|
all_grids.append((grid, covering_ratio)) |
|
if covering_ratio > covering_threshold: |
|
good_grids.append((grid, covering_ratio)) |
|
|
|
if len(good_grids) > 0: |
|
|
|
return sorted(good_grids, key=lambda x: (x[0][0] * x[0][1], -x[1]))[0][0] |
|
else: |
|
|
|
return sorted(all_grids, key=lambda x: (-x[1], x[0][0] * x[0][1]))[0][0] |
|
|
|
if convert_to_rgb and image.mode != 'RGB': |
|
image = image.convert('RGB') |
|
|
|
|
|
sides = self.get_image_size() |
|
if sides[0] != sides[1]: |
|
raise ValueError('get_image_size() returns non-square size') |
|
side = sides[0] |
|
grid = _get_best_grid(image, side) |
|
partition = _partition(image, grid) |
|
crops = [image.crop(p) for p in partition] |
|
if len(crops) > 1: |
|
crops.insert(0, image) |
|
pixel_values = torch.cat([_preprocess(crop, side) for crop in crops], dim=0) |
|
image_placeholders = self.construct_image_placeholders(grid) |
|
return pixel_values, image_placeholders |
|
|
|
def batch_decode(self, *args, **kwargs): |
|
""" |
|
This method forwards all its arguments to Qwen2TokenizerFast's [`~PreTrainedTokenizer.batch_decode`]. Please |
|
refer to the docstring of this method for more information. |
|
""" |
|
return self.tokenizer.batch_decode(*args, **kwargs) |
|
|
|
def decode(self, *args, **kwargs): |
|
""" |
|
This method forwards all its arguments to Qwen2TokenizerFast's [`~PreTrainedTokenizer.decode`]. Please refer to |
|
the docstring of this method for more information. |
|
""" |
|
return self.tokenizer.decode(*args, **kwargs) |
|
|
|
def post_process_image_text_to_text(self, generated_outputs): |
|
""" |
|
Post-process the output of the model to decode the text. |
|
|
|
Args: |
|
generated_outputs (`torch.Tensor` or `np.ndarray`): |
|
The output of the model `generate` function. The output is expected to be a tensor of shape `(batch_size, sequence_length)` |
|
or `(sequence_length,)`. |
|
|
|
Returns: |
|
`List[str]`: The decoded text. |
|
""" |
|
return self.tokenizer.batch_decode( |
|
generated_outputs, skip_special_tokens=True, clean_up_tokenization_spaces=False |
|
) |
|
|
|
@property |
|
def model_input_names(self): |
|
tokenizer_input_names = self.tokenizer.model_input_names |
|
image_processor_input_names = self.image_processor.model_input_names |
|
names_from_processor = list(dict.fromkeys(tokenizer_input_names + image_processor_input_names)) |
|
return names_from_processor + ["second_per_grid_ts"] |
|
|
|
|
|
__all__ = ["OvisProcessor"] |
|
|