from . import SentenceEvaluator |
import logging |
import os |
import csv |
from sklearn.metrics.pairwise import paired_cosine_distances, paired_euclidean_distances, paired_manhattan_distances |
from sklearn.metrics import average_precision_score |
import numpy as np |
from typing import List |
from ..readers import InputExample |
logger = logging.getLogger(__name__) |
class BinaryClassificationEvaluator(SentenceEvaluator): |
""" |
Evaluate a model based on the similarity of the embeddings by calculating the accuracy of identifying similar and |
dissimilar sentences. |
The metrics are the cosine similarity as well as euclidean and Manhattan distance |
The returned score is the accuracy with a specified metric. |
The results are written in a CSV. If a CSV already exists, then values are appended. |
The labels need to be 0 for dissimilar pairs and 1 for similar pairs. |
:param sentences1: The first column of sentences |
:param sentences2: The second column of sentences |
:param labels: labels[i] is the label for the pair (sentences1[i], sentences2[i]). Must be 0 or 1 |
:param name: Name for the output |
:param batch_size: Batch size used to compute embeddings |
:param show_progress_bar: If true, prints a progress bar |
:param write_csv: Write results to a CSV file |
""" |
def __init__(self, sentences1: List[str], sentences2: List[str], labels: List[int], name: str = '', batch_size: int = 32, show_progress_bar: bool = False, write_csv: bool = True): |
self.sentences1 = sentences1 |
self.sentences2 = sentences2 |
self.labels = labels |
assert len(self.sentences1) == len(self.sentences2) |
assert len(self.sentences1) == len(self.labels) |
for label in labels: |
assert (label == 0 or label == 1) |
self.write_csv = write_csv |
self.name = name |
self.batch_size = batch_size |
if show_progress_bar is None: |
show_progress_bar = (logger.getEffectiveLevel() == logging.INFO or logger.getEffectiveLevel() == logging.DEBUG) |
self.show_progress_bar = show_progress_bar |
self.csv_file = "binary_classification_evaluation" + ("_"+name if name else '') + "_results.csv" |
self.csv_headers = ["epoch", "steps", |
"cossim_accuracy", "cossim_accuracy_threshold", "cossim_f1", "cossim_precision", "cossim_recall", "cossim_f1_threshold", "cossim_ap", |
"manhattan_accuracy", "manhattan_accuracy_threshold", "manhattan_f1", "manhattan_precision", "manhattan_recall", "manhattan_f1_threshold", "manhattan_ap", |
"euclidean_accuracy", "euclidean_accuracy_threshold", "euclidean_f1", "euclidean_precision", "euclidean_recall", "euclidean_f1_threshold", "euclidean_ap", |
"dot_accuracy", "dot_accuracy_threshold", "dot_f1", "dot_precision", "dot_recall", "dot_f1_threshold", "dot_ap"] |
@classmethod |
def from_input_examples(cls, examples: List[InputExample], **kwargs): |
sentences1 = [] |
sentences2 = [] |
scores = [] |
for example in examples: |
sentences1.append(example.texts[0]) |
sentences2.append(example.texts[1]) |
scores.append(example.label) |
return cls(sentences1, sentences2, scores, **kwargs) |
def __call__(self, model, output_path: str = None, epoch: int = -1, steps: int = -1) -> float: |
if epoch != -1: |
if steps == -1: |
out_txt = f" after epoch {epoch}:" |
else: |
out_txt = f" in epoch {epoch} after {steps} steps:" |
else: |
out_txt = ":" |
logger.info("Binary Accuracy Evaluation of the model on " + self.name + " dataset" + out_txt) |
scores = self.compute_metrices(model) |
main_score = max(scores[short_name]['ap'] for short_name in scores) |
file_output_data = [epoch, steps] |
for header_name in self.csv_headers: |
if '_' in header_name: |
sim_fct, metric = header_name.split("_", maxsplit=1) |
file_output_data.append(scores[sim_fct][metric]) |
if output_path is not None and self.write_csv: |
csv_path = os.path.join(output_path, self.csv_file) |
if not os.path.isfile(csv_path): |
with open(csv_path, newline='', mode="w", encoding="utf-8") as f: |
writer = csv.writer(f) |
writer.writerow(self.csv_headers) |
writer.writerow(file_output_data) |
else: |
with open(csv_path, newline='', mode="a", encoding="utf-8") as f: |
writer = csv.writer(f) |
writer.writerow(file_output_data) |
return main_score |
def compute_metrices(self, model): |
sentences = list(set(self.sentences1 + self.sentences2)) |
embeddings = model.encode(sentences, batch_size=self.batch_size, show_progress_bar=self.show_progress_bar, convert_to_numpy=True) |
emb_dict = {sent: emb for sent, emb in zip(sentences, embeddings)} |
embeddings1 = [emb_dict[sent] for sent in self.sentences1] |
embeddings2 = [emb_dict[sent] for sent in self.sentences2] |
cosine_scores = 1 - paired_cosine_distances(embeddings1, embeddings2) |
manhattan_distances = paired_manhattan_distances(embeddings1, embeddings2) |
euclidean_distances = paired_euclidean_distances(embeddings1, embeddings2) |
embeddings1_np = np.asarray(embeddings1) |
embeddings2_np = np.asarray(embeddings2) |
dot_scores = [np.dot(embeddings1_np[i], embeddings2_np[i]) for i in range(len(embeddings1_np))] |
labels = np.asarray(self.labels) |
output_scores = {} |
for short_name, name, scores, reverse in [['cossim', 'Cosine-Similarity', cosine_scores, True], ['manhattan', 'Manhattan-Distance', manhattan_distances, False], ['euclidean', 'Euclidean-Distance', euclidean_distances, False], ['dot', 'Dot-Product', dot_scores, True]]: |
acc, acc_threshold = self.find_best_acc_and_threshold(scores, labels, reverse) |
f1, precision, recall, f1_threshold = self.find_best_f1_and_threshold(scores, labels, reverse) |
ap = average_precision_score(labels, scores * (1 if reverse else -1)) |
logger.info("Accuracy with {}: {:.2f}\t(Threshold: {:.4f})".format(name, acc * 100, acc_threshold)) |
logger.info("F1 with {}: {:.2f}\t(Threshold: {:.4f})".format(name, f1 * 100, f1_threshold)) |
logger.info("Precision with {}: {:.2f}".format(name, precision * 100)) |
logger.info("Recall with {}: {:.2f}".format(name, recall * 100)) |
logger.info("Average Precision with {}: {:.2f}\n".format(name, ap * 100)) |
output_scores[short_name] = { |
'accuracy' : acc, |
'accuracy_threshold': acc_threshold, |
'f1': f1, |
'f1_threshold': f1_threshold, |
'precision': precision, |
'recall': recall, |
'ap': ap |
} |
return output_scores |
@staticmethod |
def find_best_acc_and_threshold(scores, labels, high_score_more_similar: bool): |
assert len(scores) == len(labels) |
rows = list(zip(scores, labels)) |
rows = sorted(rows, key=lambda x: x[0], reverse=high_score_more_similar) |
max_acc = 0 |
best_threshold = -1 |
positive_so_far = 0 |
remaining_negatives = sum(labels == 0) |
for i in range(len(rows)-1): |
score, label = rows[i] |
if label == 1: |
positive_so_far += 1 |
else: |
remaining_negatives -= 1 |
acc = (positive_so_far + remaining_negatives) / len(labels) |
if acc > max_acc: |
max_acc = acc |
best_threshold = (rows[i][0] + rows[i+1][0]) / 2 |
return max_acc, best_threshold |
@staticmethod |
def find_best_f1_and_threshold(scores, labels, high_score_more_similar: bool): |
assert len(scores) == len(labels) |
scores = np.asarray(scores) |
labels = np.asarray(labels) |
rows = list(zip(scores, labels)) |
rows = sorted(rows, key=lambda x: x[0], reverse=high_score_more_similar) |
best_f1 = best_precision = best_recall = 0 |
threshold = 0 |
nextract = 0 |
ncorrect = 0 |
total_num_duplicates = sum(labels) |
for i in range(len(rows)-1): |
score, label = rows[i] |
nextract += 1 |
if label == 1: |
ncorrect += 1 |
if ncorrect > 0: |
precision = ncorrect / nextract |
recall = ncorrect / total_num_duplicates |
f1 = 2 * precision * recall / (precision + recall) |
if f1 > best_f1: |
best_f1 = f1 |
best_precision = precision |
best_recall = recall |
threshold = (rows[i][0] + rows[i + 1][0]) / 2 |
return best_f1, best_precision, best_recall, threshold |