Spaces:
Sleeping
Sleeping
| # Copyright 2020 The HuggingFace Datasets Authors and the current dataset script contributor. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| """TODO: Add a description here.""" | |
| import math | |
| import statistics | |
| from typing import List, Optional, Union | |
| import datasets | |
| import evaluate | |
| import numpy as np | |
| # TODO: Add BibTeX citation | |
| _CITATION = """\ | |
| @InProceedings{huggingface:module, | |
| title = {A great new module}, | |
| authors={huggingface, Inc.}, | |
| year={2020} | |
| } | |
| """ | |
| # TODO: Add description of the module here | |
| _DESCRIPTION = """\ | |
| This new module is designed to solve this great ML task and is crafted with a lot of care. | |
| """ | |
| # TODO: Add description of the arguments of the module here | |
| _KWARGS_DESCRIPTION = """ | |
| Calculates how good are predictions given some references, using certain scores | |
| Args: | |
| predictions: list of generated time series. | |
| shape: (num_generation, num_timesteps, num_features) | |
| references: list of reference | |
| shape: (num_reference, num_timesteps, num_features) | |
| Returns: | |
| Examples: | |
| Examples should be written in doctest format, and should illustrate how | |
| to use the function. | |
| >>> my_new_module = evaluate.load("bowdbeg/matching_series") | |
| >>> results = my_new_module.compute(references=[[[0.0, 1.0]]], predictions=[[[0.0, 1.0]]]) | |
| >>> print(results) | |
| {'matchin': 1.0} | |
| """ | |
| class matching_series(evaluate.Metric): | |
| """TODO: Short description of my evaluation module.""" | |
| def _info(self): | |
| # TODO: Specifies the evaluate.EvaluationModuleInfo object | |
| return evaluate.MetricInfo( | |
| # This is the description that will appear on the modules page. | |
| module_type="metric", | |
| description=_DESCRIPTION, | |
| citation=_CITATION, | |
| inputs_description=_KWARGS_DESCRIPTION, | |
| # This defines the format of each prediction and reference | |
| features=datasets.Features( | |
| { | |
| "predictions": datasets.Sequence(datasets.Sequence(datasets.Value("float"))), | |
| "references": datasets.Sequence(datasets.Sequence(datasets.Value("float"))), | |
| } | |
| ), | |
| # Homepage of the module for documentation | |
| homepage="https://huggingface.co/spaces/bowdbeg/matching_series", | |
| # Additional links to the codebase or references | |
| codebase_urls=["http://github.com/path/to/codebase/of/new_module"], | |
| reference_urls=["http://path.to.reference.url/new_module"], | |
| ) | |
| def _download_and_prepare(self, dl_manager): | |
| """Optional: download external resources useful to compute the scores""" | |
| pass | |
| def compute(self, *, predictions=None, references=None, **kwargs) -> Optional[dict]: | |
| """Compute the evaluation module. | |
| Usage of positional arguments is not allowed to prevent mistakes. | |
| Args: | |
| predictions (`list/array/tensor`, *optional*): | |
| Predictions. | |
| references (`list/array/tensor`, *optional*): | |
| References. | |
| **kwargs (optional): | |
| Keyword arguments that will be forwarded to the evaluation module [`~evaluate.EvaluationModule.compute`] | |
| method (see details in the docstring). | |
| Return: | |
| `dict` or `None` | |
| - Dictionary with the results if this evaluation module is run on the main process (`process_id == 0`). | |
| - `None` if the evaluation module is not run on the main process (`process_id != 0`). | |
| ```py | |
| >>> import evaluate | |
| >>> accuracy = evaluate.load("accuracy") | |
| >>> accuracy.compute(predictions=[0, 1, 1, 0], references=[0, 1, 0, 1]) | |
| ``` | |
| """ | |
| all_kwargs = {"predictions": predictions, "references": references, **kwargs} | |
| if predictions is None and references is None: | |
| missing_kwargs = {k: None for k in self._feature_names() if k not in all_kwargs} | |
| all_kwargs.update(missing_kwargs) | |
| else: | |
| missing_inputs = [k for k in self._feature_names() if k not in all_kwargs] | |
| if missing_inputs: | |
| raise ValueError( | |
| f"Evaluation module inputs are missing: {missing_inputs}. All required inputs are {list(self._feature_names())}" | |
| ) | |
| inputs = {input_name: all_kwargs[input_name] for input_name in self._feature_names()} | |
| compute_kwargs = {k: kwargs[k] for k in kwargs if k not in self._feature_names()} | |
| return self._compute(**inputs, **compute_kwargs) | |
| def _compute( | |
| self, | |
| predictions: Union[List, np.ndarray], | |
| references: Union[List, np.ndarray], | |
| batch_size: Optional[int] = None, | |
| cuc_n_calculation: int = 3, | |
| cuc_n_samples: Union[List[int], str] = "auto", | |
| ): | |
| """ | |
| Compute the scores of the module given the predictions and references | |
| Args: | |
| predictions: list of generated time series. | |
| shape: (num_generation, num_timesteps, num_features) | |
| references: list of reference | |
| shape: (num_reference, num_timesteps, num_features) | |
| batch_size: batch size to use for the computation. If None, the whole dataset is processed at once. | |
| cuc_n_calculation: number of Coverage Under Curve calculate times | |
| cuc_n_samples: number of samples to use for Coverage Under Curve calculation. If "auto", it uses the number of samples of the predictions. | |
| Returns: | |
| """ | |
| predictions = np.array(predictions) | |
| references = np.array(references) | |
| if predictions.shape[1:] != references.shape[1:]: | |
| raise ValueError( | |
| "The number of features in the predictions and references should be the same. predictions: {}, references: {}".format( | |
| predictions.shape[1:], references.shape[1:] | |
| ) | |
| ) | |
| # at first, convert the inputs to numpy arrays | |
| # MSE between predictions and references for all example combinations for each features | |
| # shape: (num_generation, num_reference, num_features) | |
| if batch_size is not None: | |
| mse = np.zeros((len(predictions), len(references), predictions.shape[-1])) | |
| # iterate over the predictions and references in batches | |
| for i in range(0, len(predictions) + batch_size, batch_size): | |
| for j in range(0, len(references) + batch_size, batch_size): | |
| mse[i : i + batch_size, j : j + batch_size] = np.mean( | |
| (predictions[i : i + batch_size, None] - references[None, j : j + batch_size]) ** 2, axis=-2 | |
| ) | |
| else: | |
| mse = np.mean((predictions[:, None] - references) ** 2, axis=1) | |
| index_mse = mse.diagonal(axis1=0, axis2=1).mean() | |
| # matching scores | |
| mse_mean = mse.mean(axis=-1) | |
| # best match for each generated time series | |
| # shape: (num_generation,) | |
| best_match = np.argmin(mse_mean, axis=-1) | |
| # matching mse | |
| # shape: (num_generation,) | |
| precision_mse = mse_mean[np.arange(len(best_match)), best_match].mean() | |
| # best match for each reference time series | |
| # shape: (num_reference,) | |
| best_match_inv = np.argmin(mse_mean, axis=0) | |
| recall_mse = mse_mean[best_match_inv, np.arange(len(best_match_inv))].mean() | |
| f1_mse = 2 / (1 / precision_mse + 1 / recall_mse) | |
| # matching precision, recall and f1 | |
| matching_recall = np.unique(best_match).size / len(best_match_inv) | |
| matching_precision = np.unique(best_match_inv).size / len(best_match) | |
| matching_f1 = 2 / (1 / matching_precision + 1 / matching_recall) | |
| # take matching for each feature and compute metrics for them | |
| precision_mse_features = [] | |
| recall_mse_features = [] | |
| f1_mse_features = [] | |
| matching_precision_features = [] | |
| matching_recall_features = [] | |
| matching_f1_features = [] | |
| index_mse_features = [] | |
| coverages_features = [] | |
| cuc_features = [] | |
| for f in range(predictions.shape[-1]): | |
| mse_f = mse[:, :, f] | |
| index_mse_f = mse_f.diagonal(axis1=0, axis2=1).mean() | |
| best_match_f = np.argmin(mse_f, axis=-1) | |
| precision_mse_f = mse_f[np.arange(len(best_match_f)), best_match_f].mean() | |
| best_match_inv_f = np.argmin(mse_f, axis=0) | |
| recall_mse_f = mse_f[best_match_inv_f, np.arange(len(best_match_inv_f))].mean() | |
| f1_mse_f = 2 / (1 / precision_mse_f + 1 / recall_mse_f) | |
| precision_mse_features.append(precision_mse_f) | |
| recall_mse_features.append(recall_mse_f) | |
| f1_mse_features.append(f1_mse_f) | |
| index_mse_features.append(index_mse_f) | |
| matching_recall_f = np.unique(best_match_f).size / len(best_match_f) | |
| matching_precision_f = np.unique(best_match_inv_f).size / len(best_match_inv_f) | |
| matching_f1_f = 2 / (1 / matching_precision_f + 1 / matching_recall_f) | |
| matching_precision_features.append(matching_precision_f) | |
| matching_recall_features.append(matching_recall_f) | |
| matching_f1_features.append(matching_f1_f) | |
| coverages_f, cuc_f = self.compute_cuc(best_match_f, len(references), cuc_n_calculation, cuc_n_samples) | |
| coverages_features.append(coverages_f) | |
| cuc_features.append(cuc_f) | |
| macro_precision_mse = statistics.mean(precision_mse_features) | |
| macro_recall_mse = statistics.mean(recall_mse_features) | |
| macro_f1_mse = statistics.mean(f1_mse_features) | |
| macro_index_mse = statistics.mean(index_mse_features) | |
| macro_matching_precision = statistics.mean(matching_precision_features) | |
| macro_matching_recall = statistics.mean(matching_recall_features) | |
| macro_matching_f1 = statistics.mean(matching_f1_features) | |
| # cuc | |
| coverages, cuc = self.compute_cuc(best_match, len(references), cuc_n_calculation, cuc_n_samples) | |
| macro_cuc = statistics.mean(cuc_features) | |
| macro_coverages = [statistics.mean(c) for c in zip(*coverages_features)] | |
| return { | |
| "precision_mse": precision_mse, | |
| "f1_mse": f1_mse, | |
| "recall_mse": recall_mse, | |
| "index_mse": index_mse, | |
| "precision_mse_features": precision_mse_features, | |
| "f1_mse_features": f1_mse_features, | |
| "recall_mse_features": recall_mse_features, | |
| "index_mse_features": index_mse_features, | |
| "macro_precision_mse": macro_precision_mse, | |
| "macro_recall_mse": macro_recall_mse, | |
| "macro_f1_mse": macro_f1_mse, | |
| "macro_index_mse": macro_index_mse, | |
| "matching_precision": matching_precision, | |
| "matching_recall": matching_recall, | |
| "matching_f1": matching_f1, | |
| "matching_precision_features": matching_precision_features, | |
| "matching_recall_features": matching_recall_features, | |
| "matching_f1_features": matching_f1_features, | |
| "macro_matching_precision": macro_matching_precision, | |
| "macro_matching_recall": macro_matching_recall, | |
| "macro_matching_f1": macro_matching_f1, | |
| "cuc": cuc, | |
| "coverages": coverages, | |
| "macro_cuc": macro_cuc, | |
| "macro_coverages": macro_coverages, | |
| "cuc_features": cuc_features, | |
| "coverages_features": coverages_features, | |
| } | |
| def compute_cuc( | |
| self, | |
| match: np.ndarray, | |
| n_reference: int, | |
| n_calculation: int, | |
| n_samples: Union[List[int], str], | |
| ): | |
| """ | |
| Compute Coverage Under Curve | |
| Args: | |
| match: best match for each generated time series | |
| n_reference: number of reference time series | |
| n_calculation: number of Coverage Under Curve calculate times | |
| n_samples: number of samples to use for Coverage Under Curve calculation. If "auto", it uses the number of samples of the predictions. | |
| Returns: | |
| """ | |
| n_generaiton = len(match) | |
| if n_samples == "auto": | |
| exp = int(math.log2(n_generaiton)) | |
| n_samples = [int(2**i) for i in range(exp)] | |
| n_samples.append(n_generaiton) | |
| assert isinstance(n_samples, list) and all(isinstance(n, int) for n in n_samples) | |
| coverages = [] | |
| for n_sample in n_samples: | |
| coverage = 0 | |
| for _ in range(n_calculation): | |
| sample = np.random.choice(match, size=n_sample, replace=False) # type: ignore | |
| coverage += len(np.unique(sample)) / n_reference | |
| coverages.append(coverage / n_calculation) | |
| cuc = np.trapz(coverages, n_samples) / len(n_samples) / max(n_samples) | |
| return coverages, cuc | |