Spaces:
Sleeping
Sleeping
import logging | |
from typing import Callable | |
import numpy as np | |
import torch | |
import torch.nn.functional as F | |
from torch.utils.data import DataLoader | |
from transformers import (AutoModel, AutoTokenizer, BatchEncoding, | |
XLMRobertaModel, PreTrainedTokenizer, PreTrainedTokenizerFast) | |
from transformers.modeling_outputs import \ | |
BaseModelOutputWithPoolingAndCrossAttentions as EncoderOutput | |
from common.decorators import singleton | |
logger = logging.getLogger(__name__) | |
class EmbeddingExtractor: | |
"""Класс обрабатывает текст вопроса и возвращает embedding""" | |
def __init__( | |
self, | |
model_id: str | None, | |
device: str | torch.device | None = None, | |
batch_size: int = 1, | |
do_normalization: bool = True, | |
max_len: int = 510, | |
model: XLMRobertaModel = None, | |
tokenizer: PreTrainedTokenizer | PreTrainedTokenizerFast = None | |
): | |
""" | |
Класс, соединяющий в себе модель, токенизатор и параметры векторизации. | |
Args: | |
model_id: Идентификатор модели. | |
device: Устройство для вычислений (по умолчанию - GPU, если доступен). | |
batch_size: Размер батча (по умолчанию - 1). | |
do_normalization: Нормировать ли вектора (по умолчанию - True). | |
max_len: Максимальная длина текста в токенах (по умолчанию - 510). | |
model: Экземпляр загруженной модели. | |
tokenizer: Экземпляр загруженного токенизатора. | |
""" | |
if device is None: | |
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
else: | |
device = torch.device(device) | |
self.device = device | |
# Инициализация модели | |
if model is not None and tokenizer is not None: | |
self.tokenizer = tokenizer | |
self.model = model | |
elif model_id is not None: | |
print('EmbeddingExtractor: model loading '+model_id+' to '+str(self.device)) | |
self.tokenizer = AutoTokenizer.from_pretrained(model_id, local_files_only=True) | |
self.model: XLMRobertaModel = AutoModel.from_pretrained(model_id, local_files_only=True).to( | |
self.device | |
) | |
print('EmbeddingExtractor: model loaded') | |
self.model.eval() | |
self.model.share_memory() | |
self.batch_size = batch_size if device.type != 'cpu' else 1 | |
self.do_normalization = do_normalization | |
self.max_len = max_len | |
def _average_pool( | |
last_hidden_states: torch.Tensor, attention_mask: torch.Tensor | |
) -> torch.Tensor: | |
""" | |
Расчёт усредненного эмбеддинга по всем токенам | |
Args: | |
last_hidden_states: Матрица эмбеддингов отдельных токенов размерности (batch_size, seq_len, embedding_size) - последний скрытый слой | |
attention_mask: Маска, чтобы не учитывать при усреднении пустые токены | |
Returns: | |
torch.Tensor - Усредненный эмбеддинг размерности (batch_size, embedding_size) | |
""" | |
last_hidden = last_hidden_states.masked_fill( | |
~attention_mask[..., None].bool(), 0.0 | |
) | |
return last_hidden.sum(dim=1) / attention_mask.sum(dim=1)[..., None] | |
def _query_tokenization(self, text: str | list[str]) -> BatchEncoding: | |
""" | |
Преобразует текст в токены. | |
Args: | |
text: Текст. | |
max_len: Максимальная длина текста (510 токенов) | |
Returns: | |
BatchEncoding - Словарь с ключами "input_ids", "attention_mask" и т.п. | |
""" | |
if isinstance(text, str): | |
cleaned_text = text.replace('\n', ' ') | |
else: | |
cleaned_text = [t.replace('\n', ' ') for t in text] | |
return self.tokenizer( | |
cleaned_text, | |
return_tensors='pt', | |
padding=True, | |
truncation=True, | |
max_length=self.max_len, | |
) | |
def query_embed_extraction( | |
self, | |
text: str, | |
do_normalization: bool = True, | |
) -> np.ndarray: | |
""" | |
Функция преобразует один текст в эмбеддинг размерности (1, embedding_size) | |
Args: | |
text: Текст. | |
do_normalization: Нормировать ли вектора embedding | |
Returns: | |
np.array - Эмбеддинг размерности (1, embedding_size) | |
""" | |
inputs = self._query_tokenization(text).to(self.device) | |
outputs = self.model(**inputs) | |
mask = inputs["attention_mask"] | |
embedding = self._average_pool(outputs.last_hidden_state, mask) | |
if do_normalization: | |
embedding = F.normalize(embedding, dim=-1) | |
return embedding.cpu().numpy() | |
def vectorize( | |
self, | |
texts: list[str] | str, | |
progress_callback: Callable[[int, int], None] | None = None, | |
) -> np.ndarray: | |
""" | |
Векторизует все тексты в списке. | |
Во многом аналогичен методу query_embed_extraction, в будущем стоит объединить их. | |
Args: | |
texts: Список текстов или один текст. | |
progress_callback: Функция, которая будет вызываться при каждом шаге векторизации. | |
Принимает два аргумента: current и total. | |
current - текущий шаг векторизации. | |
total - общее количество шагов векторизации. | |
Returns: | |
np.array - Матрица эмбеддингов размерности (texts_count, embedding_size) | |
""" | |
if isinstance(texts, str): | |
texts = [texts] | |
loader = DataLoader(texts, batch_size=self.batch_size) | |
embeddings = [] | |
logger.info( | |
'Vectorizing texts with batch size %d on %s', self.batch_size, self.device | |
) | |
for i, batch in enumerate(loader): | |
embeddings.append(self._vectorize_batch(batch)) | |
if progress_callback is not None: | |
progress_callback(i * self.batch_size, len(texts)) | |
else: | |
logger.info('Vectorized batch %d', i) | |
logger.info('Vectorized all %d batches', len(embeddings)) | |
result = torch.cat(embeddings).numpy() | |
# Всегда возвращаем двумерный массив | |
if result.ndim == 1: | |
result = result.reshape(1, -1) | |
return result | |
def _vectorize_batch( | |
self, | |
texts: list[str], | |
) -> torch.Tensor: | |
""" | |
Векторизует один батч текстов. | |
Args: | |
texts: Список текстов. | |
Returns: | |
torch.Tensor - Матрица эмбеддингов размерности (batch_size, embedding_size) | |
""" | |
tokenized = self._query_tokenization(texts).to(self.device) | |
outputs: EncoderOutput = self.model(**tokenized) | |
mask = tokenized["attention_mask"] | |
embedding = self._average_pool(outputs.last_hidden_state, mask) | |
if self.do_normalization: | |
embedding = F.normalize(embedding, dim=-1) | |
return embedding.cpu() | |
def get_dim(self) -> int: | |
""" | |
Возвращает размерность эмбеддинга. | |
""" | |
return self.model.config.hidden_size | |