|
""" |
|
Tests that the pretrained models produce the correct scores on the STSbenchmark dataset |
|
""" |
|
import csv |
|
import gzip |
|
import os |
|
import unittest |
|
|
|
from torch.utils.data import DataLoader |
|
|
|
from sentence_transformers import SentenceTransformer, SentencesDataset, losses, models, util |
|
from sentence_transformers.evaluation import EmbeddingSimilarityEvaluator |
|
from sentence_transformers.readers import InputExample |
|
|
|
|
|
class PretrainedSTSbTest(unittest.TestCase): |
|
def setUp(self): |
|
sts_dataset_path = 'datasets/stsbenchmark.tsv.gz' |
|
if not os.path.exists(sts_dataset_path): |
|
util.http_get('https://sbert.net/datasets/stsbenchmark.tsv.gz', sts_dataset_path) |
|
|
|
nli_dataset_path = 'datasets/AllNLI.tsv.gz' |
|
if not os.path.exists(nli_dataset_path): |
|
util.http_get('https://sbert.net/datasets/AllNLI.tsv.gz', nli_dataset_path) |
|
|
|
|
|
label2int = {"contradiction": 0, "entailment": 1, "neutral": 2} |
|
self.nli_train_samples = [] |
|
max_train_samples = 10000 |
|
with gzip.open(nli_dataset_path, 'rt', encoding='utf8') as fIn: |
|
reader = csv.DictReader(fIn, delimiter='\t', quoting=csv.QUOTE_NONE) |
|
for row in reader: |
|
if row['split'] == 'train': |
|
label_id = label2int[row['label']] |
|
self.nli_train_samples.append(InputExample(texts=[row['sentence1'], row['sentence2']], label=label_id)) |
|
if len(self.nli_train_samples) >= max_train_samples: |
|
break |
|
|
|
|
|
self.stsb_train_samples = [] |
|
self.dev_samples = [] |
|
self.test_samples = [] |
|
with gzip.open(sts_dataset_path, 'rt', encoding='utf8') as fIn: |
|
reader = csv.DictReader(fIn, delimiter='\t', quoting=csv.QUOTE_NONE) |
|
for row in reader: |
|
score = float(row['score']) / 5.0 |
|
inp_example = InputExample(texts=[row['sentence1'], row['sentence2']], label=score) |
|
|
|
if row['split'] == 'dev': |
|
self.dev_samples.append(inp_example) |
|
elif row['split'] == 'test': |
|
self.test_samples.append(inp_example) |
|
else: |
|
self.stsb_train_samples.append(inp_example) |
|
|
|
def evaluate_stsb_test(self, model, expected_score): |
|
evaluator = EmbeddingSimilarityEvaluator.from_input_examples(self.test_samples, name='sts-test') |
|
score = model.evaluate(evaluator)*100 |
|
print("STS-Test Performance: {:.2f} vs. exp: {:.2f}".format(score, expected_score)) |
|
assert score > expected_score or abs(score-expected_score) < 0.1 |
|
|
|
def test_train_stsb(self): |
|
word_embedding_model = models.Transformer('distilbert-base-uncased') |
|
pooling_model = models.Pooling(word_embedding_model.get_word_embedding_dimension()) |
|
model = SentenceTransformer(modules=[word_embedding_model, pooling_model]) |
|
train_dataset = SentencesDataset(self.stsb_train_samples, model) |
|
train_dataloader = DataLoader(train_dataset, shuffle=True, batch_size=16) |
|
train_loss = losses.CosineSimilarityLoss(model=model) |
|
model.fit(train_objectives=[(train_dataloader, train_loss)], |
|
evaluator=None, |
|
epochs=1, |
|
evaluation_steps=1000, |
|
warmup_steps=int(len(train_dataloader)*0.1), |
|
use_amp=True) |
|
|
|
self.evaluate_stsb_test(model, 80.0) |
|
|
|
def test_train_nli(self): |
|
word_embedding_model = models.Transformer('distilbert-base-uncased') |
|
pooling_model = models.Pooling(word_embedding_model.get_word_embedding_dimension()) |
|
model = SentenceTransformer(modules=[word_embedding_model, pooling_model]) |
|
train_dataset = SentencesDataset(self.nli_train_samples, model=model) |
|
train_dataloader = DataLoader(train_dataset, shuffle=True, batch_size=16) |
|
train_loss = losses.SoftmaxLoss(model=model, sentence_embedding_dimension=model.get_sentence_embedding_dimension(), num_labels=3) |
|
model.fit(train_objectives=[(train_dataloader, train_loss)], |
|
evaluator=None, |
|
epochs=1, |
|
warmup_steps=int(len(train_dataloader) * 0.1), |
|
use_amp=True) |
|
|
|
self.evaluate_stsb_test(model, 50.0) |
|
|
|
|
|
|
|
if "__main__" == __name__: |
|
unittest.main() |