import numpy as np from src.dataloading import load_run_data from lmsim.metrics import Metrics, Kappa_p, EC def load_data_and_compute_similarities(models: list[str], dataset: str, metric_name: str) -> np.array: # Load data probs = [] gts = [] for model in models: model_probs, model_gt = load_run_data(model, dataset) probs.append(model_probs) gts.append(model_gt) # Compute pairwise similarities similarities = compute_pairwise_similarities(metric_name, probs, gts) return similarities def compute_similarity(metric: Metrics, probs_a: list[np.array], gt_a: list[int], probs_b: list[np.array], gt_b: list[int]) -> float: # Check that the models have the same number of responses assert len(probs_a) == len(probs_b), f"Models must have the same number of responses: {len(probs_a)} != {len(probs_b)}" # Only keep responses where the ground truth is the same output_a = [] output_b = [] gt = [] for i in range(len(probs_a)): if gt_a[i] == gt_b[i]: output_a.append(probs_a[i]) output_b.append(probs_b[i]) gt.append(gt_a[i]) # Placeholder similarity value similarity = metric.compute_k(output_a, output_b, gt) return similarity def compute_pairwise_similarities(metric_name: str, probs: list[list[np.array]], gts: list[list[int]]) -> np.array: # Select chosen metric if metric_name == "Kappa_p (prob.)": metric = Kappa_p() elif metric_name == "Kappa_p (det.)": metric = Kappa_p(prob=False) # Convert probabilities to one-hot probs = [[one_hot(p) for p in model_probs] for model_probs in probs] elif metric_name == "Error Consistency": metric = EC() else: raise ValueError(f"Invalid metric: {metric_name}") similarities = np.zeros((len(probs), len(probs))) for i in range(len(probs)): for j in range(i, len(probs)): similarities[i, j] = compute_similarity(metric, probs[i], gts[i], probs[j], gts[j]) similarities[j, i] = similarities[i, j] return similarities def one_hot(probs: np.array) -> np.array: one_hot = np.zeros_like(probs) one_hot[np.argmax(probs)] = 1 return one_hot