Spaces:
Paused
Paused
| # Copyright (c) Facebook, Inc. and its affiliates. | |
| # All rights reserved. | |
| # | |
| # This source code is licensed under the BSD-style license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| # | |
| # LASER Language-Agnostic SEntence Representations | |
| # is a toolkit to calculate multilingual sentence embeddings | |
| # and to use them for various tasks such as document classification, | |
| # and bitext filtering | |
| # | |
| # -------------------------------------------------------- | |
| # | |
| # Tool to calculate the dual approach multilingual similarity error rate (P-xSIM) | |
| import typing as tp | |
| from pathlib import Path | |
| import faiss | |
| import numpy as np | |
| import torch | |
| from scipy.special import softmax | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from stopes.eval.auto_pcp.audio_comparator import Comparator, get_model_pred | |
| from xsim import Margin, score_margin | |
| def get_neighbors( | |
| x: np.ndarray, y: np.ndarray, k: int, margin: str | |
| ) -> tp.Tuple[np.ndarray, np.ndarray, int]: | |
| x_copy = x.astype(np.float32).copy() | |
| y_copy = y.astype(np.float32).copy() | |
| nbex, dim = x.shape | |
| # create index | |
| idx_x = faiss.IndexFlatIP(dim) | |
| idx_y = faiss.IndexFlatIP(dim) | |
| # L2 normalization needed for cosine distance | |
| faiss.normalize_L2(x_copy) | |
| faiss.normalize_L2(y_copy) | |
| idx_x.add(x_copy) | |
| idx_y.add(y_copy) | |
| if margin == Margin.ABSOLUTE.value: | |
| scores, indices = idx_y.search(x_copy, k) | |
| else: | |
| # return cosine similarity and indices of k closest neighbors | |
| Cos_xy, Idx_xy = idx_y.search(x_copy, k) | |
| Cos_yx, Idx_yx = idx_x.search(y_copy, k) | |
| # average cosines | |
| Avg_xy = Cos_xy.mean(axis=1) | |
| Avg_yx = Cos_yx.mean(axis=1) | |
| scores = score_margin(Cos_xy, Idx_xy, Avg_xy, Avg_yx, margin, k) | |
| indices = Idx_xy | |
| return scores, indices, nbex | |
| def get_cosine_scores(src_emb: np.ndarray, neighbor_embs: np.ndarray) -> np.ndarray: | |
| assert src_emb.shape[0] == neighbor_embs.shape[1] | |
| src_embs = np.repeat( | |
| np.expand_dims(src_emb, axis=0), neighbor_embs.shape[0], axis=0 | |
| ) | |
| cosine_scores = cosine_similarity(src_embs, neighbor_embs).diagonal() | |
| return cosine_scores | |
| def get_comparator_scores( | |
| src_emb: np.ndarray, | |
| neighbor_embs: np.ndarray, | |
| comparator_model: tp.Any, | |
| symmetrize_comparator: bool, | |
| ) -> np.ndarray: | |
| src_embs = np.repeat( | |
| np.expand_dims(src_emb, axis=0), neighbor_embs.shape[0], axis=0 | |
| ) | |
| a = torch.from_numpy(src_embs).unsqueeze(1) # restore depth dim | |
| b = torch.from_numpy(neighbor_embs).unsqueeze(1) | |
| res = get_comparator_preds(a, b, comparator_model, symmetrize_comparator) | |
| scores_softmax = softmax(res) | |
| return np.array(scores_softmax) | |
| def get_comparator_preds( | |
| src_emb: np.ndarray, tgt_emb: np.ndarray, model: tp.Any, symmetrize: bool | |
| ): | |
| preds = ( | |
| get_model_pred( | |
| model, | |
| src=src_emb[:, 0], | |
| mt=tgt_emb[:, 0], | |
| use_gpu=model.use_gpu, | |
| batch_size=1, | |
| )[:, 0] | |
| .cpu() | |
| .numpy() | |
| ) | |
| if symmetrize: | |
| preds2 = ( | |
| get_model_pred( | |
| model, | |
| src=tgt_emb[:, 0], | |
| mt=src_emb[:, 0], | |
| use_gpu=model.use_gpu, | |
| batch_size=1, | |
| )[:, 0] | |
| .cpu() | |
| .numpy() | |
| ) | |
| preds = (preds2 + preds) / 2 | |
| return preds | |
| def get_blended_predictions( | |
| alpha: float, | |
| nbex: int, | |
| margin_scores: np.ndarray, | |
| x_aux: np.ndarray, | |
| y_aux: np.ndarray, | |
| neighbor_indices: np.ndarray, | |
| comparator_model: tp.Optional[tp.Any] = None, | |
| symmetrize_comparator: bool = False, | |
| ) -> list[int]: | |
| predictions = [] | |
| for src_index in range(nbex): | |
| neighbors = neighbor_indices[src_index] | |
| neighbor_embs = y_aux[neighbors].astype(np.float32) | |
| src_emb = x_aux[src_index].astype(np.float32) | |
| aux_scores = ( | |
| get_comparator_scores( | |
| src_emb, neighbor_embs, comparator_model, symmetrize_comparator | |
| ) | |
| if comparator_model | |
| else get_cosine_scores(src_emb, neighbor_embs) | |
| ) | |
| assert margin_scores[src_index].shape == aux_scores.shape | |
| blended_scores = alpha * margin_scores[src_index] + (1 - alpha) * aux_scores | |
| blended_neighbor_idx = blended_scores.argmax() | |
| predictions.append(neighbors[blended_neighbor_idx]) | |
| return predictions | |
| def PxSIM( | |
| x: np.ndarray, | |
| y: np.ndarray, | |
| x_aux: np.ndarray, | |
| y_aux: np.ndarray, | |
| alpha: float, | |
| margin: str = Margin.RATIO.value, | |
| k: int = 16, | |
| comparator_path: tp.Optional[Path] = None, | |
| symmetrize_comparator: bool = False, | |
| ) -> tp.Tuple[int, int, list[int]]: | |
| """ | |
| Parameters | |
| ---------- | |
| x : np.ndarray | |
| source-side embedding array | |
| y : np.ndarray | |
| target-side embedding array | |
| x_aux : np.ndarray | |
| source-side embedding array using auxiliary model | |
| y_aux : np.ndarray | |
| target-side embedding array using auxiliary model | |
| alpha : int | |
| parameter to weight blended score | |
| margin : str | |
| margin scoring function (e.g. ratio, absolute, distance) | |
| k : int | |
| number of neighbors in k-nn search | |
| comparator_path : Path | |
| path to AutoPCP model config | |
| symmetrize_comparator : bool | |
| whether to symmetrize the comparator predictions | |
| Returns | |
| ------- | |
| err : int | |
| Number of errors | |
| nbex : int | |
| Number of examples | |
| preds : list[int] | |
| List of (index-based) predictions | |
| """ | |
| assert Margin.has_value(margin), f"Margin type: {margin}, is not supported." | |
| comparator_model = Comparator.load(comparator_path) if comparator_path else None | |
| # get margin-based nearest neighbors | |
| margin_scores, neighbor_indices, nbex = get_neighbors(x, y, k=k, margin=margin) | |
| preds = get_blended_predictions( | |
| alpha, | |
| nbex, | |
| margin_scores, | |
| x_aux, | |
| y_aux, | |
| neighbor_indices, | |
| comparator_model, | |
| symmetrize_comparator, | |
| ) | |
| err = sum([idx != pred for idx, pred in enumerate(preds)]) | |
| print(f"P-xSIM error: {100 * (err / nbex):.2f}") | |
| return err, nbex, preds | |
| def load_embeddings( | |
| infile: Path, dim: int, fp16: bool = False, numpy_header: bool = False | |
| ) -> np.ndarray: | |
| assert infile.exists(), f"file: {infile} does not exist." | |
| if numpy_header: | |
| return np.load(infile) | |
| emb = np.fromfile(infile, dtype=np.float16 if fp16 else np.float32) | |
| num_examples = emb.shape[0] // dim | |
| emb.resize(num_examples, dim) | |
| if fp16: | |
| emb = emb.astype(np.float32) # faiss currently only supports fp32 | |
| return emb | |
| def run( | |
| src_emb: Path, | |
| tgt_emb: Path, | |
| src_aux_emb: Path, | |
| tgt_aux_emb: Path, | |
| alpha: float, | |
| margin: str = Margin.RATIO.value, | |
| k: int = 16, | |
| emb_fp16: bool = False, | |
| aux_emb_fp16: bool = False, | |
| emb_dim: int = 1024, | |
| aux_emb_dim: int = 1024, | |
| numpy_header: bool = False, | |
| comparator_path: tp.Optional[Path] = None, | |
| symmetrize_comparator: bool = False, | |
| prediction_savepath: tp.Optional[Path] = None, | |
| ) -> None: | |
| x = load_embeddings(src_emb, emb_dim, emb_fp16, numpy_header) | |
| y = load_embeddings(tgt_emb, emb_dim, emb_fp16, numpy_header) | |
| x_aux = load_embeddings(src_aux_emb, aux_emb_dim, aux_emb_fp16, numpy_header) | |
| y_aux = load_embeddings(tgt_aux_emb, aux_emb_dim, aux_emb_fp16, numpy_header) | |
| assert (x.shape == y.shape) and (x_aux.shape == y_aux.shape) | |
| _, _, preds = PxSIM( | |
| x, y, x_aux, y_aux, alpha, margin, k, comparator_path, symmetrize_comparator | |
| ) | |
| if prediction_savepath: | |
| with open(prediction_savepath, "w") as outf: | |
| for pred in preds: | |
| print(pred, file=outf) | |
| if __name__ == "__main__": | |
| import func_argparse | |
| func_argparse.main() | |