File size: 3,122 Bytes
ead1d68 1c92355 ead1d68 83654d1 161df4e a2aeb80 161df4e 1c92355 a5531b4 1c92355 ead1d68 ad1e7a3 c8a0a8a ad1e7a3 c8a0a8a 961d21c ead1d68 a2aeb80 ead1d68 a2aeb80 ead1d68 a2aeb80 ead1d68 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
from transformers import PreTrainedModel
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from .config_gzipembed import *
from tqdm.auto import tqdm
import torch
import gzip
import multiprocessing
class GZIPEmbeddingModel(PreTrainedModel):
config_class = GZIPEmbeddingConfig
def __init__(self, config):
super().__init__(config)
if config.reduction:
self.reduction_head = torch.nn.Linear(len(config.corpus), config.reduced_dimension)
else:
self.reduction_head = None
self.dummy_parameter = torch.nn.Parameter(torch.ones(1))
def forward(self, prompt, num_procs=16, return_tensor=True):
global calculate_ncd_row
global p
def calculate_ncd_row(data_row):
i = data_row[0]
row = self.ncd(data_row[1], p)
return i, row
if type(prompt) == str:
prompt = [prompt]
x = []
for p in prompt:
ncd = [0] * len(self.config.corpus)
with multiprocessing.Pool(num_procs) as pool:
data = enumerate(self.config.corpus)
results = pool.map(calculate_ncd_row,data)
for i,row in results:
ncd[i]=row
x.append(ncd)
if self.reduction_head is not None:
x = torch.tensor(x)
x = x.to(self.reduction_head.dtype).to(self.reduction_head.device)
return self.reduction_head(x)
return x if not return_tensor else torch.tensor(x)
def encode(self, sentences, batch_size=32, **kwargs):
"""
Returns a list of embeddings for the given sentences.
Args:
sentences (`List[str]`): List of sentences to encode
batch_size (`int`): Batch size for the encoding
Returns:
`List[np.ndarray]` or `List[tensor]`: List of embeddings for the given sentences
"""
import numpy as np
x = self.forward(sentences, num_procs=batch_size, return_tensor=False)
# return [torch.tensor(i) for i in x]
return [np.array(i) for i in x] # test?
def normalize(self, x):
x = ''.join([char for char in x.lower() if char in "abcdefghijklmnopqrstuvwxyz "])
x = word_tokenize(x)
x = [w for w in x if not w in self.config.stop_words]
return ' '.join(x)
def ncd(self, x, y):
_x = self.normalize(x) if self.config.normalize else x
_y = self.normalize(y) if (not self.config.normalized_corpus) and self.config.normalize else y
x_c = len(gzip.compress(_x.encode()))
y_c = len(gzip.compress(_y.encode()))
xy_c = len(gzip.compress(f"{_x} {_y}".encode()))
return (xy_c-min(x_c,y_c))/max(x_c,y_c)
def gzip_embed(
self,
corpus,
document,
verbose=False,
):
embedding = []
for reference_document in (corpus if not verbose else tqdm(corpus)):
embedding.append(self.ncd(reference_document, document))
return embedding
def dimensionality(self):
return len(self.config.corpus)
|