File size: 3,122 Bytes
ead1d68
 
 
 
 
 
 
1c92355
ead1d68
 
 
 
 
 
 
 
 
 
83654d1
161df4e
 
 
 
a2aeb80
161df4e
1c92355
 
 
 
 
 
 
a5531b4
1c92355
 
 
ead1d68
 
 
 
 
ad1e7a3
 
 
 
 
 
 
 
 
 
 
c8a0a8a
ad1e7a3
c8a0a8a
 
961d21c
ead1d68
 
 
 
 
 
 
 
 
 
 
 
a2aeb80
 
ead1d68
 
a2aeb80
 
ead1d68
 
 
 
 
 
a2aeb80
ead1d68
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
from transformers import PreTrainedModel
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from .config_gzipembed import *
from tqdm.auto import tqdm
import torch
import gzip
import multiprocessing
class GZIPEmbeddingModel(PreTrainedModel):
    config_class = GZIPEmbeddingConfig
    def __init__(self, config):
        super().__init__(config)
        if config.reduction:
            self.reduction_head = torch.nn.Linear(len(config.corpus), config.reduced_dimension)
        else:
            self.reduction_head = None
            self.dummy_parameter = torch.nn.Parameter(torch.ones(1))

    def forward(self, prompt, num_procs=16, return_tensor=True):
        global calculate_ncd_row
        global p
        def calculate_ncd_row(data_row):
            i = data_row[0]
            row = self.ncd(data_row[1], p)
            return i, row
        if type(prompt) == str:
            prompt = [prompt]
        x = []
        for p in prompt:
            ncd = [0] * len(self.config.corpus)
            with multiprocessing.Pool(num_procs) as pool:
                data = enumerate(self.config.corpus)
                results = pool.map(calculate_ncd_row,data)
            for i,row in results:
                ncd[i]=row
            x.append(ncd)
        if self.reduction_head is not None:
            x = torch.tensor(x)
            x = x.to(self.reduction_head.dtype).to(self.reduction_head.device)
            return self.reduction_head(x)
        return x if not return_tensor else torch.tensor(x)
    
    def encode(self, sentences, batch_size=32, **kwargs):
        """
        Returns a list of embeddings for the given sentences.
        Args:
            sentences (`List[str]`): List of sentences to encode
            batch_size (`int`): Batch size for the encoding

        Returns:
            `List[np.ndarray]` or `List[tensor]`: List of embeddings for the given sentences
        """
        import numpy as np
        x = self.forward(sentences, num_procs=batch_size, return_tensor=False)
        # return [torch.tensor(i) for i in x]
        return [np.array(i) for i in x] # test?

    def normalize(self, x):
        x = ''.join([char for char in x.lower() if char in "abcdefghijklmnopqrstuvwxyz "])
        x = word_tokenize(x)
        x = [w for w in x if not w in self.config.stop_words]
        return ' '.join(x)

    def ncd(self, x, y):
        _x = self.normalize(x) if self.config.normalize else x
        _y = self.normalize(y) if (not self.config.normalized_corpus) and self.config.normalize else y
        x_c = len(gzip.compress(_x.encode()))
        y_c = len(gzip.compress(_y.encode()))
        xy_c = len(gzip.compress(f"{_x} {_y}".encode()))
        return (xy_c-min(x_c,y_c))/max(x_c,y_c)

    def gzip_embed(
        self,
        corpus,
        document,
        verbose=False,
    ):
        embedding = []
        for reference_document in (corpus if not verbose else tqdm(corpus)):
            embedding.append(self.ncd(reference_document, document))
        return embedding

    def dimensionality(self):
        return len(self.config.corpus)