File size: 6,435 Bytes
2359bda |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
from . import SentenceEvaluator, SimilarityFunction
import logging
import os
import csv
from sklearn.metrics.pairwise import paired_cosine_distances, paired_euclidean_distances, paired_manhattan_distances
from typing import List
from ..readers import InputExample
logger = logging.getLogger(__name__)
class TripletEvaluator(SentenceEvaluator):
"""
Evaluate a model based on a triplet: (sentence, positive_example, negative_example).
Checks if distance(sentence, positive_example) < distance(sentence, negative_example).
"""
def __init__(
self,
anchors: List[str],
positives: List[str],
negatives: List[str],
main_distance_function: SimilarityFunction = None,
name: str = "",
batch_size: int = 16,
show_progress_bar: bool = False,
write_csv: bool = True,
):
"""
:param anchors: Sentences to check similarity to. (e.g. a query)
:param positives: List of positive sentences
:param negatives: List of negative sentences
:param main_distance_function: One of 0 (Cosine), 1 (Euclidean) or 2 (Manhattan). Defaults to None, returning all 3.
:param name: Name for the output
:param batch_size: Batch size used to compute embeddings
:param show_progress_bar: If true, prints a progress bar
:param write_csv: Write results to a CSV file
"""
self.anchors = anchors
self.positives = positives
self.negatives = negatives
self.name = name
assert len(self.anchors) == len(self.positives)
assert len(self.anchors) == len(self.negatives)
self.main_distance_function = main_distance_function
self.batch_size = batch_size
if show_progress_bar is None:
show_progress_bar = (
logger.getEffectiveLevel() == logging.INFO or logger.getEffectiveLevel() == logging.DEBUG
)
self.show_progress_bar = show_progress_bar
self.csv_file: str = "triplet_evaluation" + ("_" + name if name else "") + "_results.csv"
self.csv_headers = ["epoch", "steps", "accuracy_cosinus", "accuracy_manhattan", "accuracy_euclidean"]
self.write_csv = write_csv
@classmethod
def from_input_examples(cls, examples: List[InputExample], **kwargs):
anchors = []
positives = []
negatives = []
for example in examples:
anchors.append(example.texts[0])
positives.append(example.texts[1])
negatives.append(example.texts[2])
return cls(anchors, positives, negatives, **kwargs)
def __call__(self, model, output_path: str = None, epoch: int = -1, steps: int = -1) -> float:
if epoch != -1:
if steps == -1:
out_txt = " after epoch {}:".format(epoch)
else:
out_txt = " in epoch {} after {} steps:".format(epoch, steps)
else:
out_txt = ":"
logger.info("TripletEvaluator: Evaluating the model on " + self.name + " dataset" + out_txt)
num_triplets = 0
num_correct_cos_triplets, num_correct_manhattan_triplets, num_correct_euclidean_triplets = 0, 0, 0
embeddings_anchors = model.encode(
self.anchors, batch_size=self.batch_size, show_progress_bar=self.show_progress_bar, convert_to_numpy=True
)
embeddings_positives = model.encode(
self.positives, batch_size=self.batch_size, show_progress_bar=self.show_progress_bar, convert_to_numpy=True
)
embeddings_negatives = model.encode(
self.negatives, batch_size=self.batch_size, show_progress_bar=self.show_progress_bar, convert_to_numpy=True
)
# Cosine distance
pos_cos_distance = paired_cosine_distances(embeddings_anchors, embeddings_positives)
neg_cos_distances = paired_cosine_distances(embeddings_anchors, embeddings_negatives)
# Manhattan
pos_manhattan_distance = paired_manhattan_distances(embeddings_anchors, embeddings_positives)
neg_manhattan_distances = paired_manhattan_distances(embeddings_anchors, embeddings_negatives)
# Euclidean
pos_euclidean_distance = paired_euclidean_distances(embeddings_anchors, embeddings_positives)
neg_euclidean_distances = paired_euclidean_distances(embeddings_anchors, embeddings_negatives)
for idx in range(len(pos_cos_distance)):
num_triplets += 1
if pos_cos_distance[idx] < neg_cos_distances[idx]:
num_correct_cos_triplets += 1
if pos_manhattan_distance[idx] < neg_manhattan_distances[idx]:
num_correct_manhattan_triplets += 1
if pos_euclidean_distance[idx] < neg_euclidean_distances[idx]:
num_correct_euclidean_triplets += 1
accuracy_cos = num_correct_cos_triplets / num_triplets
accuracy_manhattan = num_correct_manhattan_triplets / num_triplets
accuracy_euclidean = num_correct_euclidean_triplets / num_triplets
logger.info("Accuracy Cosine Distance: \t{:.2f}".format(accuracy_cos * 100))
logger.info("Accuracy Manhattan Distance:\t{:.2f}".format(accuracy_manhattan * 100))
logger.info("Accuracy Euclidean Distance:\t{:.2f}\n".format(accuracy_euclidean * 100))
if output_path is not None and self.write_csv:
csv_path = os.path.join(output_path, self.csv_file)
if not os.path.isfile(csv_path):
with open(csv_path, newline="", mode="w", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(self.csv_headers)
writer.writerow([epoch, steps, accuracy_cos, accuracy_manhattan, accuracy_euclidean])
else:
with open(csv_path, newline="", mode="a", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow([epoch, steps, accuracy_cos, accuracy_manhattan, accuracy_euclidean])
if self.main_distance_function == SimilarityFunction.COSINE:
return accuracy_cos
if self.main_distance_function == SimilarityFunction.MANHATTAN:
return accuracy_manhattan
if self.main_distance_function == SimilarityFunction.EUCLIDEAN:
return accuracy_euclidean
return max(accuracy_cos, accuracy_manhattan, accuracy_euclidean)
|