Spaces:
Build error
Build error
# Copyright (c) Meta Platforms, Inc. and affiliates. | |
# All rights reserved. | |
# | |
# This source code is licensed under the license found in the | |
# LICENSE file in the root directory of this source tree. | |
import logging | |
from abc import ABCMeta, abstractmethod | |
from typing import Any, List, Optional, Sequence, Union | |
from torch import Tensor | |
from mmengine.dist import (broadcast_object_list, collect_results, | |
is_main_process) | |
from mmengine.fileio import dump | |
from mmengine.logging import print_log | |
from mmengine.registry import METRICS | |
from mmengine.structures import BaseDataElement | |
class BaseMetric(metaclass=ABCMeta): | |
"""Base class for a metric. | |
The metric first processes each batch of data_samples and predictions, | |
and appends the processed results to the results list. Then it | |
collects all results together from all ranks if distributed training | |
is used. Finally, it computes the metrics of the entire dataset. | |
A subclass of class:`BaseMetric` should assign a meaningful value to the | |
class attribute `default_prefix`. See the argument `prefix` for details. | |
Args: | |
collect_device (str): Device name used for collecting results from | |
different ranks during distributed training. Must be 'cpu' or | |
'gpu'. Defaults to 'cpu'. | |
prefix (str, optional): The prefix that will be added in the metric | |
names to disambiguate homonymous metrics of different evaluators. | |
If prefix is not provided in the argument, self.default_prefix | |
will be used instead. Default: None | |
collect_dir: (str, optional): Synchronize directory for collecting data | |
from different ranks. This argument should only be configured when | |
``collect_device`` is 'cpu'. Defaults to None. | |
`New in version 0.7.3.` | |
""" | |
default_prefix: Optional[str] = None | |
def __init__(self, | |
collect_device: str = 'cpu', | |
prefix: Optional[str] = None, | |
collect_dir: Optional[str] = None) -> None: | |
if collect_dir is not None and collect_device != 'cpu': | |
raise ValueError('`collec_dir` could only be configured when ' | |
"`collect_device='cpu'`") | |
self._dataset_meta: Union[None, dict] = None | |
self.collect_device = collect_device | |
self.results: List[Any] = [] | |
self.prefix = prefix or self.default_prefix | |
self.collect_dir = collect_dir | |
if self.prefix is None: | |
print_log( | |
'The prefix is not set in metric class ' | |
f'{self.__class__.__name__}.', | |
logger='current', | |
level=logging.WARNING) | |
def dataset_meta(self) -> Optional[dict]: | |
"""Optional[dict]: Meta info of the dataset.""" | |
return self._dataset_meta | |
def dataset_meta(self, dataset_meta: dict) -> None: | |
"""Set the dataset meta info to the metric.""" | |
self._dataset_meta = dataset_meta | |
def process(self, data_batch: Any, data_samples: Sequence[dict]) -> None: | |
"""Process one batch of data samples and predictions. The processed | |
results should be stored in ``self.results``, which will be used to | |
compute the metrics when all batches have been processed. | |
Args: | |
data_batch (Any): A batch of data from the dataloader. | |
data_samples (Sequence[dict]): A batch of outputs from | |
the model. | |
""" | |
def compute_metrics(self, results: list) -> dict: | |
"""Compute the metrics from processed results. | |
Args: | |
results (list): The processed results of each batch. | |
Returns: | |
dict: The computed metrics. The keys are the names of the metrics, | |
and the values are corresponding results. | |
""" | |
def evaluate(self, size: int) -> dict: | |
"""Evaluate the model performance of the whole dataset after processing | |
all batches. | |
Args: | |
size (int): Length of the entire validation dataset. When batch | |
size > 1, the dataloader may pad some data samples to make | |
sure all ranks have the same length of dataset slice. The | |
``collect_results`` function will drop the padded data based on | |
this size. | |
Returns: | |
dict: Evaluation metrics dict on the val dataset. The keys are the | |
names of the metrics, and the values are corresponding results. | |
""" | |
if len(self.results) == 0: | |
print_log( | |
f'{self.__class__.__name__} got empty `self.results`. Please ' | |
'ensure that the processed results are properly added into ' | |
'`self.results` in `process` method.', | |
logger='current', | |
level=logging.WARNING) | |
if self.collect_device == 'cpu': | |
results = collect_results( | |
self.results, | |
size, | |
self.collect_device, | |
tmpdir=self.collect_dir) | |
else: | |
results = collect_results(self.results, size, self.collect_device) | |
if is_main_process(): | |
# cast all tensors in results list to cpu | |
results = _to_cpu(results) | |
_metrics = self.compute_metrics(results) # type: ignore | |
# Add prefix to metric names | |
if self.prefix: | |
_metrics = { | |
'/'.join((self.prefix, k)): v | |
for k, v in _metrics.items() | |
} | |
metrics = [_metrics] | |
else: | |
metrics = [None] # type: ignore | |
broadcast_object_list(metrics) | |
# reset the results list | |
self.results.clear() | |
return metrics[0] | |
class DumpResults(BaseMetric): | |
"""Dump model predictions to a pickle file for offline evaluation. | |
Args: | |
out_file_path (str): Path of the dumped file. Must end with '.pkl' | |
or '.pickle'. | |
collect_device (str): Device name used for collecting results from | |
different ranks during distributed training. Must be 'cpu' or | |
'gpu'. Defaults to 'cpu'. | |
collect_dir: (str, optional): Synchronize directory for collecting data | |
from different ranks. This argument should only be configured when | |
``collect_device`` is 'cpu'. Defaults to None. | |
`New in version 0.7.3.` | |
""" | |
def __init__(self, | |
out_file_path: str, | |
collect_device: str = 'cpu', | |
collect_dir: Optional[str] = None) -> None: | |
super().__init__( | |
collect_device=collect_device, collect_dir=collect_dir) | |
if not out_file_path.endswith(('.pkl', '.pickle')): | |
raise ValueError('The output file must be a pkl file.') | |
self.out_file_path = out_file_path | |
def process(self, data_batch: Any, predictions: Sequence[dict]) -> None: | |
"""transfer tensors in predictions to CPU.""" | |
self.results.extend(_to_cpu(predictions)) | |
def compute_metrics(self, results: list) -> dict: | |
"""dump the prediction results to a pickle file.""" | |
dump(results, self.out_file_path) | |
print_log( | |
f'Results has been saved to {self.out_file_path}.', | |
logger='current') | |
return {} | |
def _to_cpu(data: Any) -> Any: | |
"""transfer all tensors and BaseDataElement to cpu.""" | |
if isinstance(data, (Tensor, BaseDataElement)): | |
return data.to('cpu') | |
elif isinstance(data, list): | |
return [_to_cpu(d) for d in data] | |
elif isinstance(data, tuple): | |
return tuple(_to_cpu(d) for d in data) | |
elif isinstance(data, dict): | |
return {k: _to_cpu(v) for k, v in data.items()} | |
else: | |
return data | |