Spaces:
Build error
Build error
File size: 7,932 Bytes
28c256d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the license found in the
# LICENSE file in the root directory of this source tree.
import logging
from abc import ABCMeta, abstractmethod
from typing import Any, List, Optional, Sequence, Union
from torch import Tensor
from mmengine.dist import (broadcast_object_list, collect_results,
is_main_process)
from mmengine.fileio import dump
from mmengine.logging import print_log
from mmengine.registry import METRICS
from mmengine.structures import BaseDataElement
class BaseMetric(metaclass=ABCMeta):
"""Base class for a metric.
The metric first processes each batch of data_samples and predictions,
and appends the processed results to the results list. Then it
collects all results together from all ranks if distributed training
is used. Finally, it computes the metrics of the entire dataset.
A subclass of class:`BaseMetric` should assign a meaningful value to the
class attribute `default_prefix`. See the argument `prefix` for details.
Args:
collect_device (str): Device name used for collecting results from
different ranks during distributed training. Must be 'cpu' or
'gpu'. Defaults to 'cpu'.
prefix (str, optional): The prefix that will be added in the metric
names to disambiguate homonymous metrics of different evaluators.
If prefix is not provided in the argument, self.default_prefix
will be used instead. Default: None
collect_dir: (str, optional): Synchronize directory for collecting data
from different ranks. This argument should only be configured when
``collect_device`` is 'cpu'. Defaults to None.
`New in version 0.7.3.`
"""
default_prefix: Optional[str] = None
def __init__(self,
collect_device: str = 'cpu',
prefix: Optional[str] = None,
collect_dir: Optional[str] = None) -> None:
if collect_dir is not None and collect_device != 'cpu':
raise ValueError('`collec_dir` could only be configured when '
"`collect_device='cpu'`")
self._dataset_meta: Union[None, dict] = None
self.collect_device = collect_device
self.results: List[Any] = []
self.prefix = prefix or self.default_prefix
self.collect_dir = collect_dir
if self.prefix is None:
print_log(
'The prefix is not set in metric class '
f'{self.__class__.__name__}.',
logger='current',
level=logging.WARNING)
@property
def dataset_meta(self) -> Optional[dict]:
"""Optional[dict]: Meta info of the dataset."""
return self._dataset_meta
@dataset_meta.setter
def dataset_meta(self, dataset_meta: dict) -> None:
"""Set the dataset meta info to the metric."""
self._dataset_meta = dataset_meta
@abstractmethod
def process(self, data_batch: Any, data_samples: Sequence[dict]) -> None:
"""Process one batch of data samples and predictions. The processed
results should be stored in ``self.results``, which will be used to
compute the metrics when all batches have been processed.
Args:
data_batch (Any): A batch of data from the dataloader.
data_samples (Sequence[dict]): A batch of outputs from
the model.
"""
@abstractmethod
def compute_metrics(self, results: list) -> dict:
"""Compute the metrics from processed results.
Args:
results (list): The processed results of each batch.
Returns:
dict: The computed metrics. The keys are the names of the metrics,
and the values are corresponding results.
"""
def evaluate(self, size: int) -> dict:
"""Evaluate the model performance of the whole dataset after processing
all batches.
Args:
size (int): Length of the entire validation dataset. When batch
size > 1, the dataloader may pad some data samples to make
sure all ranks have the same length of dataset slice. The
``collect_results`` function will drop the padded data based on
this size.
Returns:
dict: Evaluation metrics dict on the val dataset. The keys are the
names of the metrics, and the values are corresponding results.
"""
if len(self.results) == 0:
print_log(
f'{self.__class__.__name__} got empty `self.results`. Please '
'ensure that the processed results are properly added into '
'`self.results` in `process` method.',
logger='current',
level=logging.WARNING)
if self.collect_device == 'cpu':
results = collect_results(
self.results,
size,
self.collect_device,
tmpdir=self.collect_dir)
else:
results = collect_results(self.results, size, self.collect_device)
if is_main_process():
# cast all tensors in results list to cpu
results = _to_cpu(results)
_metrics = self.compute_metrics(results) # type: ignore
# Add prefix to metric names
if self.prefix:
_metrics = {
'/'.join((self.prefix, k)): v
for k, v in _metrics.items()
}
metrics = [_metrics]
else:
metrics = [None] # type: ignore
broadcast_object_list(metrics)
# reset the results list
self.results.clear()
return metrics[0]
@METRICS.register_module()
class DumpResults(BaseMetric):
"""Dump model predictions to a pickle file for offline evaluation.
Args:
out_file_path (str): Path of the dumped file. Must end with '.pkl'
or '.pickle'.
collect_device (str): Device name used for collecting results from
different ranks during distributed training. Must be 'cpu' or
'gpu'. Defaults to 'cpu'.
collect_dir: (str, optional): Synchronize directory for collecting data
from different ranks. This argument should only be configured when
``collect_device`` is 'cpu'. Defaults to None.
`New in version 0.7.3.`
"""
def __init__(self,
out_file_path: str,
collect_device: str = 'cpu',
collect_dir: Optional[str] = None) -> None:
super().__init__(
collect_device=collect_device, collect_dir=collect_dir)
if not out_file_path.endswith(('.pkl', '.pickle')):
raise ValueError('The output file must be a pkl file.')
self.out_file_path = out_file_path
def process(self, data_batch: Any, predictions: Sequence[dict]) -> None:
"""transfer tensors in predictions to CPU."""
self.results.extend(_to_cpu(predictions))
def compute_metrics(self, results: list) -> dict:
"""dump the prediction results to a pickle file."""
dump(results, self.out_file_path)
print_log(
f'Results has been saved to {self.out_file_path}.',
logger='current')
return {}
def _to_cpu(data: Any) -> Any:
"""transfer all tensors and BaseDataElement to cpu."""
if isinstance(data, (Tensor, BaseDataElement)):
return data.to('cpu')
elif isinstance(data, list):
return [_to_cpu(d) for d in data]
elif isinstance(data, tuple):
return tuple(_to_cpu(d) for d in data)
elif isinstance(data, dict):
return {k: _to_cpu(v) for k, v in data.items()}
else:
return data
|