Spaces:
Running
on
Zero
Running
on
Zero
| from __future__ import annotations | |
| import copy | |
| from enum import Enum | |
| from packaging.version import Version | |
| import numpy as np | |
| from datashader.datashape import dshape, isnumeric, Record, Option | |
| from datashader.datashape import coretypes as ct | |
| from toolz import concat, unique | |
| import xarray as xr | |
| from datashader.antialias import AntialiasCombination, AntialiasStage2 | |
| from datashader.utils import isminus1, isnull | |
| from numba import cuda as nb_cuda | |
| try: | |
| from datashader.transfer_functions._cuda_utils import ( | |
| cuda_atomic_nanmin, cuda_atomic_nanmax, cuda_args, cuda_row_min_in_place, | |
| cuda_nanmax_n_in_place_4d, cuda_nanmax_n_in_place_3d, | |
| cuda_nanmin_n_in_place_4d, cuda_nanmin_n_in_place_3d, | |
| cuda_row_max_n_in_place_4d, cuda_row_max_n_in_place_3d, | |
| cuda_row_min_n_in_place_4d, cuda_row_min_n_in_place_3d, cuda_shift_and_insert, | |
| ) | |
| except ImportError: | |
| (cuda_atomic_nanmin, cuda_atomic_nanmax, cuda_args, cuda_row_min_in_place, | |
| cuda_nanmax_n_in_place_4d, cuda_nanmax_n_in_place_3d, | |
| cuda_nanmin_n_in_place_4d, cuda_nanmin_n_in_place_3d, | |
| cuda_row_max_n_in_place_4d, cuda_row_max_n_in_place_3d, | |
| cuda_row_min_n_in_place_4d, cuda_row_min_n_in_place_3d, cuda_shift_and_insert, | |
| ) = None, None, None, None, None, None, None, None, None, None, None, None, None | |
| try: | |
| import cudf | |
| import cupy as cp | |
| except Exception: | |
| cudf = cp = None | |
| from .utils import ( | |
| Expr, ngjit, nansum_missing, nanmax_in_place, nansum_in_place, row_min_in_place, | |
| nanmax_n_in_place_4d, nanmax_n_in_place_3d, nanmin_n_in_place_4d, nanmin_n_in_place_3d, | |
| row_max_n_in_place_4d, row_max_n_in_place_3d, row_min_n_in_place_4d, row_min_n_in_place_3d, | |
| shift_and_insert, | |
| ) | |
| class SpecialColumn(Enum): | |
| """ | |
| Internally datashader identifies the columns required by the user's | |
| Reductions and extracts them from the supplied source (e.g. DataFrame) to | |
| pass through the dynamically-generated append function in compiler.py and | |
| end up as arguments to the Reduction._append* functions. Each column is | |
| a string name or a SpecialColumn. A column of None is used in Reduction | |
| classes to denote that no column is required. | |
| """ | |
| RowIndex = 1 | |
| class UsesCudaMutex(Enum): | |
| """ | |
| Enum that encapsulates the need for a Reduction to use a CUDA mutex to | |
| operate correctly on a GPU. Possible values: | |
| No: the Reduction append_cuda function is atomic and no mutex is required. | |
| Local: Reduction append_cuda needs wrapping in a mutex. | |
| Global: the overall compiled append function needs wrapping in a mutex. | |
| """ | |
| No = 0 | |
| Local = 1 | |
| Global = 2 | |
| class Preprocess(Expr): | |
| """Base clase for preprocessing steps.""" | |
| def __init__(self, column: str | SpecialColumn | None): | |
| self.column = column | |
| def inputs(self): | |
| return (self.column,) | |
| def nan_check_column(self): | |
| return None | |
| class extract(Preprocess): | |
| """Extract a column from a dataframe as a numpy array of values.""" | |
| def apply(self, df, cuda): | |
| if self.column is SpecialColumn.RowIndex: | |
| attr_name = "_datashader_row_offset" | |
| if isinstance(df, xr.Dataset): | |
| row_offset = df.attrs[attr_name] | |
| row_length = df.attrs["_datashader_row_length"] | |
| else: | |
| attrs = getattr(df, "attrs", None) | |
| row_offset = getattr(attrs or df, attr_name, 0) | |
| row_length = len(df) | |
| if cudf and isinstance(df, cudf.DataFrame): | |
| if self.column is SpecialColumn.RowIndex: | |
| return cp.arange(row_offset, row_offset + row_length, dtype=np.int64) | |
| if df[self.column].dtype.kind == 'f': | |
| nullval = np.nan | |
| else: | |
| nullval = 0 | |
| if Version(cudf.__version__) >= Version("22.02"): | |
| return df[self.column].to_cupy(na_value=nullval) | |
| return cp.array(df[self.column].to_gpu_array(fillna=nullval)) | |
| elif self.column is SpecialColumn.RowIndex: | |
| if cuda: | |
| return cp.arange(row_offset, row_offset + row_length, dtype=np.int64) | |
| else: | |
| return np.arange(row_offset, row_offset + row_length, dtype=np.int64) | |
| elif isinstance(df, xr.Dataset): | |
| if cuda and not isinstance(df[self.column].data, cp.ndarray): | |
| return cp.asarray(df[self.column]) | |
| else: | |
| return df[self.column].data | |
| else: | |
| return df[self.column].values | |
| class CategoryPreprocess(Preprocess): | |
| """Base class for categorizing preprocessors.""" | |
| def cat_column(self): | |
| """Returns name of categorized column""" | |
| return self.column | |
| def categories(self, input_dshape): | |
| """Returns list of categories corresponding to input shape""" | |
| raise NotImplementedError("categories not implemented") | |
| def validate(self, in_dshape): | |
| """Validates input shape""" | |
| raise NotImplementedError("validate not implemented") | |
| def apply(self, df, cuda): | |
| """Applies preprocessor to DataFrame and returns array""" | |
| raise NotImplementedError("apply not implemented") | |
| class category_codes(CategoryPreprocess): | |
| """ | |
| Extract just the category codes from a categorical column. | |
| To create a new type of categorizer, derive a subclass from this | |
| class or one of its subclasses, implementing ``__init__``, | |
| ``_hashable_inputs``, ``categories``, ``validate``, and ``apply``. | |
| See the implementation of ``category_modulo`` in ``reductions.py`` | |
| for an example. | |
| """ | |
| def categories(self, input_dshape): | |
| return input_dshape.measure[self.column].categories | |
| def validate(self, in_dshape): | |
| if self.column not in in_dshape.dict: | |
| raise ValueError("specified column not found") | |
| if not isinstance(in_dshape.measure[self.column], ct.Categorical): | |
| raise ValueError("input must be categorical") | |
| def apply(self, df, cuda): | |
| if cudf and isinstance(df, cudf.DataFrame): | |
| if Version(cudf.__version__) >= Version("22.02"): | |
| return df[self.column].cat.codes.to_cupy() | |
| return df[self.column].cat.codes.to_gpu_array() | |
| else: | |
| return df[self.column].cat.codes.values | |
| class category_modulo(category_codes): | |
| """ | |
| A variation on category_codes that assigns categories using an integer column, modulo a base. | |
| Category is computed as (column_value - offset)%modulo. | |
| """ | |
| # couldn't find anything in the datashape docs about how to check if a CType is an integer, so | |
| # just define a big set | |
| IntegerTypes = {ct.bool_, ct.uint8, ct.uint16, ct.uint32, ct.uint64, ct.int8, ct.int16, | |
| ct.int32, ct.int64} | |
| def __init__(self, column, modulo, offset=0): | |
| super().__init__(column) | |
| self.offset = offset | |
| self.modulo = modulo | |
| def _hashable_inputs(self): | |
| return super()._hashable_inputs() + (self.offset, self.modulo) | |
| def categories(self, in_dshape): | |
| return list(range(self.modulo)) | |
| def validate(self, in_dshape): | |
| if self.column not in in_dshape.dict: | |
| raise ValueError("specified column not found") | |
| if in_dshape.measure[self.column] not in self.IntegerTypes: | |
| raise ValueError("input must be an integer column") | |
| def apply(self, df, cuda): | |
| result = (df[self.column] - self.offset) % self.modulo | |
| if cudf and isinstance(df, cudf.Series): | |
| if Version(cudf.__version__) >= Version("22.02"): | |
| return result.to_cupy() | |
| return result.to_gpu_array() | |
| else: | |
| return result.values | |
| class category_binning(category_modulo): | |
| """ | |
| A variation on category_codes that assigns categories by binning a continuous-valued column. | |
| The number of categories returned is always nbins+1. | |
| The last category (nbin) is for NaNs in the data column, as well as for values under/over the | |
| binned interval (when include_under or include_over is False). | |
| Parameters | |
| ---------- | |
| column: column to use | |
| lower: lower bound of first bin | |
| upper: upper bound of last bin | |
| nbins: number of bins | |
| include_under: if True, values below bin 0 are assigned to category 0 | |
| include_over: if True, values above the last bin (nbins-1) are assigned to category nbin-1 | |
| """ | |
| def __init__(self, column, lower, upper, nbins, include_under=True, include_over=True): | |
| super().__init__(column, nbins + 1) # +1 category for NaNs and clipped values | |
| self.bin0 = lower | |
| self.binsize = (upper - lower) / float(nbins) | |
| self.nbins = nbins | |
| self.bin_under = 0 if include_under else nbins | |
| self.bin_over = nbins-1 if include_over else nbins | |
| def _hashable_inputs(self): | |
| return super()._hashable_inputs() + (self.bin0, self.binsize, self.bin_under, self.bin_over) | |
| def validate(self, in_dshape): | |
| if self.column not in in_dshape.dict: | |
| raise ValueError("specified column not found") | |
| def apply(self, df, cuda): | |
| if cudf and isinstance(df, cudf.DataFrame): | |
| if Version(cudf.__version__) >= Version("22.02"): | |
| values = df[self.column].to_cupy(na_value=cp.nan) | |
| else: | |
| values = cp.array(df[self.column].to_gpu_array(fillna=True)) | |
| nan_values = cp.isnan(values) | |
| else: | |
| values = df[self.column].to_numpy() | |
| nan_values = np.isnan(values) | |
| index_float = (values - self.bin0) / self.binsize | |
| # NaN values are corrected below, so set them to zero to avoid warnings when | |
| # converting from float to int. | |
| index_float[nan_values] = 0 | |
| index = index_float.astype(int) | |
| index[index < 0] = self.bin_under | |
| index[index >= self.nbins] = self.bin_over | |
| index[nan_values] = self.nbins | |
| return index | |
| class category_values(CategoryPreprocess): | |
| """Extract a category and a value column from a dataframe as (2,N) numpy array of values.""" | |
| def __init__(self, categorizer, value_column): | |
| super().__init__(value_column) | |
| self.categorizer = categorizer | |
| def inputs(self): | |
| return (self.categorizer.column, self.column) | |
| def cat_column(self): | |
| """Returns name of categorized column""" | |
| return self.categorizer.column | |
| def categories(self, input_dshape): | |
| return self.categorizer.categories | |
| def validate(self, in_dshape): | |
| return self.categorizer.validate(in_dshape) | |
| def apply(self, df, cuda): | |
| a = self.categorizer.apply(df, cuda) | |
| if cudf and isinstance(df, cudf.DataFrame): | |
| import cupy | |
| if self.column == SpecialColumn.RowIndex: | |
| nullval = -1 | |
| elif df[self.column].dtype.kind == 'f': | |
| nullval = np.nan | |
| else: | |
| nullval = 0 | |
| a = cupy.asarray(a) | |
| if self.column == SpecialColumn.RowIndex: | |
| b = extract(SpecialColumn.RowIndex).apply(df, cuda) | |
| elif Version(cudf.__version__) >= Version("22.02"): | |
| b = df[self.column].to_cupy(na_value=nullval) | |
| else: | |
| b = cupy.asarray(df[self.column].fillna(nullval)) | |
| return cupy.stack((a, b), axis=-1) | |
| else: | |
| if self.column == SpecialColumn.RowIndex: | |
| b = extract(SpecialColumn.RowIndex).apply(df, cuda) | |
| else: | |
| b = df[self.column].values | |
| return np.stack((a, b), axis=-1) | |
| class Reduction(Expr): | |
| """Base class for per-bin reductions.""" | |
| def __init__(self, column: str | SpecialColumn | None=None): | |
| self.column = column | |
| self._nan_check_column = None | |
| def nan_check_column(self): | |
| if self._nan_check_column is not None: | |
| return extract(self._nan_check_column) | |
| else: | |
| return None | |
| def uses_cuda_mutex(self) -> UsesCudaMutex: | |
| """Return ``True`` if this Reduction needs to use a CUDA mutex to | |
| ensure that it is threadsafe across CUDA threads. | |
| If the CUDA append functions are all atomic (i.e. using functions from | |
| the numba.cuda.atomic module) then this is ``False``, otherwise it is | |
| ``True``. | |
| """ | |
| return UsesCudaMutex.No | |
| def uses_row_index(self, cuda, partitioned): | |
| """Return ``True`` if this Reduction uses a row index virtual column. | |
| For some reductions the order of the rows of supplied data is | |
| important. These include ``first`` and ``last`` reductions as well as | |
| ``where`` reductions that return a row index. In some situations the | |
| order is intrinsic such as ``first`` reductions that are processed | |
| sequentially (i.e. on a CPU without using Dask) and no extra column is | |
| required. But in situations of parallel processing (using a GPU or | |
| Dask) extra information is needed that is provided by a row index | |
| virtual column. | |
| Returning ``True`` from this function will cause a row index column to | |
| be created and passed to the ``append`` functions in the usual manner. | |
| """ | |
| return False | |
| def validate(self, in_dshape): | |
| if self.column == SpecialColumn.RowIndex: | |
| return | |
| if self.column not in in_dshape.dict: | |
| raise ValueError("specified column not found") | |
| if not isnumeric(in_dshape.measure[self.column]): | |
| raise ValueError("input must be numeric") | |
| def inputs(self): | |
| return (extract(self.column),) | |
| def is_categorical(self): | |
| """Return ``True`` if this is or contains a categorical reduction.""" | |
| return False | |
| def is_where(self): | |
| """Return ``True`` if this is a ``where`` reduction or directly wraps | |
| a where reduction.""" | |
| return False | |
| def _antialias_requires_2_stages(self): | |
| # Return True if this Reduction must be processed with 2 stages, | |
| # False if it doesn't matter. | |
| # Overridden in derived classes as appropriate. | |
| return False | |
| def _antialias_stage_2(self, self_intersect, array_module) -> tuple[AntialiasStage2]: | |
| # Only called if using antialiased lines. Overridden in derived classes. | |
| # Returns a tuple containing an item for each constituent reduction. | |
| # Each item is (AntialiasCombination, zero_value)). | |
| raise NotImplementedError(f"{type(self)}._antialias_stage_2 is not defined") | |
| def _build_bases(self, cuda, partitioned): | |
| return (self,) | |
| def _build_combine_temps(self, cuda, partitioned): | |
| # Temporaries (i.e. not returned to user) that are reductions, the | |
| # aggs of which are passed to the combine() function but not the | |
| # append() functions, as opposed to _build_temps() which are passed | |
| # to both append() and combine(). | |
| return () | |
| def _build_temps(self, cuda=False): | |
| # Temporaries (i.e. not returned to user) that are reductions, the | |
| # aggs of which are passed to both append() and combine() functions. | |
| return () | |
| def _build_create(self, required_dshape): | |
| fields = getattr(required_dshape.measure, "fields", None) | |
| if fields is not None and len(required_dshape.measure.fields) > 0: | |
| # If more than one field then they all have the same dtype so can just take the first. | |
| first_field = required_dshape.measure.fields[0] | |
| required_dshape = dshape(first_field[1]) | |
| if isinstance(required_dshape, Option): | |
| required_dshape = dshape(required_dshape.ty) | |
| if required_dshape == dshape(ct.bool_): | |
| return self._create_bool | |
| elif required_dshape == dshape(ct.float32): | |
| return self._create_float32_nan | |
| elif required_dshape == dshape(ct.float64): | |
| return self._create_float64_nan | |
| elif required_dshape == dshape(ct.int64): | |
| return self._create_int64 | |
| elif required_dshape == dshape(ct.uint32): | |
| return self._create_uint32 | |
| else: | |
| raise NotImplementedError(f"Unexpected dshape {dshape}") | |
| def _build_append(self, dshape, schema, cuda, antialias, self_intersect): | |
| if cuda: | |
| if antialias and self.column is None: | |
| return self._append_no_field_antialias_cuda | |
| elif antialias: | |
| return self._append_antialias_cuda | |
| elif self.column is None: | |
| return self._append_no_field_cuda | |
| else: | |
| return self._append_cuda | |
| else: | |
| if antialias and self.column is None: | |
| return self._append_no_field_antialias | |
| elif antialias: | |
| return self._append_antialias | |
| elif self.column is None: | |
| return self._append_no_field | |
| else: | |
| return self._append | |
| def _build_combine(self, dshape, antialias, cuda, partitioned, categorical = False): | |
| return self._combine | |
| def _build_finalize(self, dshape): | |
| return self._finalize | |
| def _create_bool(shape, array_module): | |
| return array_module.zeros(shape, dtype='bool') | |
| def _create_float32_nan(shape, array_module): | |
| return array_module.full(shape, array_module.nan, dtype='f4') | |
| def _create_float64_nan(shape, array_module): | |
| return array_module.full(shape, array_module.nan, dtype='f8') | |
| def _create_float64_empty(shape, array_module): | |
| return array_module.empty(shape, dtype='f8') | |
| def _create_float64_zero(shape, array_module): | |
| return array_module.zeros(shape, dtype='f8') | |
| def _create_int64(shape, array_module): | |
| return array_module.full(shape, -1, dtype='i8') | |
| def _create_uint32(shape, array_module): | |
| return array_module.zeros(shape, dtype='u4') | |
| class OptionalFieldReduction(Reduction): | |
| """Base class for things like ``count`` or ``any`` for which the field is optional""" | |
| def __init__(self, column=None): | |
| super().__init__(column) | |
| def inputs(self): | |
| return (extract(self.column),) if self.column is not None else () | |
| def validate(self, in_dshape): | |
| if self.column is not None: | |
| super().validate(in_dshape) | |
| def _finalize(bases, cuda=False, **kwargs): | |
| return xr.DataArray(bases[0], **kwargs) | |
| class SelfIntersectingOptionalFieldReduction(OptionalFieldReduction): | |
| """ | |
| Base class for optional field reductions for which self-intersecting | |
| geometry may or may not be desirable. | |
| Ignored if not using antialiasing. | |
| """ | |
| def __init__(self, column=None, self_intersect=True): | |
| super().__init__(column) | |
| self.self_intersect = self_intersect | |
| def _antialias_requires_2_stages(self): | |
| return not self.self_intersect | |
| def _build_append(self, dshape, schema, cuda, antialias, self_intersect): | |
| if antialias and not self_intersect: | |
| # append functions specific to antialiased lines without self_intersect | |
| if cuda: | |
| if self.column is None: | |
| return self._append_no_field_antialias_cuda_not_self_intersect | |
| else: | |
| return self._append_antialias_cuda_not_self_intersect | |
| else: | |
| if self.column is None: | |
| return self._append_no_field_antialias_not_self_intersect | |
| else: | |
| return self._append_antialias_not_self_intersect | |
| # Fall back to base class implementation | |
| return super()._build_append(dshape, schema, cuda, antialias, self_intersect) | |
| def _hashable_inputs(self): | |
| # Reductions with different self_intersect attributes much have different hashes otherwise | |
| # toolz.memoize will treat them as the same to give incorrect results. | |
| return super()._hashable_inputs() + (self.self_intersect,) | |
| class count(SelfIntersectingOptionalFieldReduction): | |
| """Count elements in each bin, returning the result as a uint32, or a | |
| float32 if using antialiasing. | |
| Parameters | |
| ---------- | |
| column : str, optional | |
| If provided, only counts elements in ``column`` that are not ``NaN``. | |
| Otherwise, counts every element. | |
| """ | |
| def out_dshape(self, in_dshape, antialias, cuda, partitioned): | |
| return dshape(ct.float32) if antialias else dshape(ct.uint32) | |
| def _antialias_stage_2(self, self_intersect, array_module) -> tuple[AntialiasStage2]: | |
| if self_intersect: | |
| return (AntialiasStage2(AntialiasCombination.SUM_1AGG, array_module.nan),) | |
| else: | |
| return (AntialiasStage2(AntialiasCombination.SUM_2AGG, array_module.nan),) | |
| # CPU append functions | |
| def _append(x, y, agg, field): | |
| if not isnull(field): | |
| agg[y, x] += 1 | |
| return 0 | |
| return -1 | |
| def _append_antialias(x, y, agg, field, aa_factor, prev_aa_factor): | |
| if not isnull(field): | |
| if isnull(agg[y, x]): | |
| agg[y, x] = aa_factor - prev_aa_factor | |
| else: | |
| agg[y, x] += aa_factor - prev_aa_factor | |
| return 0 | |
| return -1 | |
| def _append_antialias_not_self_intersect(x, y, agg, field, aa_factor, prev_aa_factor): | |
| if not isnull(field): | |
| if isnull(agg[y, x]) or aa_factor > agg[y, x]: | |
| agg[y, x] = aa_factor | |
| return 0 | |
| return -1 | |
| def _append_no_field(x, y, agg): | |
| agg[y, x] += 1 | |
| return 0 | |
| def _append_no_field_antialias(x, y, agg, aa_factor, prev_aa_factor): | |
| if isnull(agg[y, x]): | |
| agg[y, x] = aa_factor - prev_aa_factor | |
| else: | |
| agg[y, x] += aa_factor - prev_aa_factor | |
| return 0 | |
| def _append_no_field_antialias_not_self_intersect(x, y, agg, aa_factor, prev_aa_factor): | |
| if isnull(agg[y, x]) or aa_factor > agg[y, x]: | |
| agg[y, x] = aa_factor | |
| return 0 | |
| return -1 | |
| # GPU append functions | |
| def _append_antialias_cuda(x, y, agg, field, aa_factor, prev_aa_factor): | |
| value = field*aa_factor | |
| if not isnull(value): | |
| old = cuda_atomic_nanmax(agg, (y, x), value) | |
| if isnull(old) or old < value: | |
| return 0 | |
| return -1 | |
| def _append_no_field_antialias_cuda_not_self_intersect(x, y, agg, aa_factor, prev_aa_factor): | |
| if not isnull(aa_factor): | |
| old = cuda_atomic_nanmax(agg, (y, x), aa_factor) | |
| if isnull(old) or old < aa_factor: | |
| return 0 | |
| return -1 | |
| def _append_cuda(x, y, agg, field): | |
| if not isnull(field): | |
| nb_cuda.atomic.add(agg, (y, x), 1) | |
| return 0 | |
| return -1 | |
| def _append_no_field_antialias_cuda(x, y, agg, aa_factor, prev_aa_factor): | |
| if not isnull(aa_factor): | |
| old = cuda_atomic_nanmax(agg, (y, x), aa_factor) | |
| if isnull(old) or old < aa_factor: | |
| return 0 | |
| return -1 | |
| def _append_no_field_cuda(x, y, agg): | |
| nb_cuda.atomic.add(agg, (y, x), 1) | |
| return 0 | |
| def _build_combine(self, dshape, antialias, cuda, partitioned, categorical = False): | |
| if antialias: | |
| return self._combine_antialias | |
| else: | |
| return self._combine | |
| def _combine(aggs): | |
| return aggs.sum(axis=0, dtype='u4') | |
| def _combine_antialias(aggs): | |
| ret = aggs[0] | |
| for i in range(1, len(aggs)): | |
| nansum_in_place(ret, aggs[i]) | |
| return ret | |
| class _count_ignore_antialiasing(count): | |
| """Count reduction but ignores antialiasing. Used by mean reduction. | |
| """ | |
| def out_dshape(self, in_dshape, antialias, cuda, partitioned): | |
| return dshape(ct.uint32) | |
| def _antialias_stage_2(self, self_intersect, array_module) -> tuple[AntialiasStage2]: | |
| if self_intersect: | |
| return (AntialiasStage2(AntialiasCombination.SUM_1AGG, 0),) | |
| else: | |
| return (AntialiasStage2(AntialiasCombination.SUM_2AGG, 0),) | |
| def _append_antialias(x, y, agg, field, aa_factor, prev_aa_factor): | |
| if not isnull(field) and prev_aa_factor == 0.0: | |
| agg[y, x] += 1 | |
| return 0 | |
| return -1 | |
| def _append_antialias_not_self_intersect(x, y, agg, field, aa_factor, prev_aa_factor): | |
| if not isnull(field) and prev_aa_factor == 0.0: | |
| agg[y, x] += 1 | |
| return 0 | |
| return -1 | |
| class by(Reduction): | |
| """Apply the provided reduction separately per category. | |
| Parameters | |
| ---------- | |
| cats: str or CategoryPreprocess instance | |
| Name of column to aggregate over, or a categorizer object that returns categories. | |
| Resulting aggregate has an outer dimension axis along the categories present. | |
| reduction : Reduction | |
| Per-category reduction function. | |
| """ | |
| def __init__(self, cat_column, reduction=count()): | |
| super().__init__() | |
| # set basic categorizer | |
| if isinstance(cat_column, CategoryPreprocess): | |
| self.categorizer = cat_column | |
| elif isinstance(cat_column, str): | |
| self.categorizer = category_codes(cat_column) | |
| else: | |
| raise TypeError("first argument must be a column name or a CategoryPreprocess instance") | |
| self.column = self.categorizer.column # for backwards compatibility with count_cat | |
| self.columns = (self.categorizer.column,) | |
| if (columns := getattr(reduction, 'columns', None)) is not None: | |
| # Must reverse columns (from where reduction) so that val_column property | |
| # is the column that is returned to the user. | |
| self.columns += columns[::-1] | |
| else: | |
| self.columns += (getattr(reduction, 'column', None),) | |
| self.reduction = reduction | |
| # if a value column is supplied, set category_values preprocessor | |
| if self.val_column is not None: | |
| self.preprocess = category_values(self.categorizer, self.val_column) | |
| else: | |
| self.preprocess = self.categorizer | |
| def __hash__(self): | |
| return hash((type(self), self._hashable_inputs(), self.categorizer._hashable_inputs(), | |
| self.reduction)) | |
| def _build_temps(self, cuda=False): | |
| return tuple(by(self.categorizer, tmp) for tmp in self.reduction._build_temps(cuda)) | |
| def cat_column(self): | |
| return self.columns[0] | |
| def val_column(self): | |
| return self.columns[1] | |
| def validate(self, in_dshape): | |
| self.preprocess.validate(in_dshape) | |
| self.reduction.validate(in_dshape) | |
| def out_dshape(self, input_dshape, antialias, cuda, partitioned): | |
| cats = self.categorizer.categories(input_dshape) | |
| red_shape = self.reduction.out_dshape(input_dshape, antialias, cuda, partitioned) | |
| return dshape(Record([(c, red_shape) for c in cats])) | |
| def inputs(self): | |
| return (self.preprocess,) | |
| def is_categorical(self): | |
| return True | |
| def is_where(self): | |
| return self.reduction.is_where() | |
| def nan_check_column(self): | |
| return self.reduction.nan_check_column | |
| def uses_cuda_mutex(self) -> UsesCudaMutex: | |
| return self.reduction.uses_cuda_mutex() | |
| def uses_row_index(self, cuda, partitioned): | |
| return self.reduction.uses_row_index(cuda, partitioned) | |
| def _antialias_requires_2_stages(self): | |
| return self.reduction._antialias_requires_2_stages() | |
| def _antialias_stage_2(self, self_intersect, array_module) -> tuple[AntialiasStage2]: | |
| ret = self.reduction._antialias_stage_2(self_intersect, array_module) | |
| return (AntialiasStage2(combination=ret[0].combination, | |
| zero=ret[0].zero, | |
| n_reduction=ret[0].n_reduction, | |
| categorical=True),) | |
| def _build_create(self, required_dshape): | |
| n_cats = len(required_dshape.measure.fields) | |
| return lambda shape, array_module: self.reduction._build_create( | |
| required_dshape)(shape + (n_cats,), array_module) | |
| def _build_bases(self, cuda, partitioned): | |
| bases = self.reduction._build_bases(cuda, partitioned) | |
| if len(bases) == 1 and bases[0] is self: | |
| return bases | |
| return tuple(by(self.categorizer, base) for base in bases) | |
| def _build_append(self, dshape, schema, cuda, antialias, self_intersect): | |
| return self.reduction._build_append(dshape, schema, cuda, antialias, self_intersect) | |
| def _build_combine(self, dshape, antialias, cuda, partitioned, categorical = False): | |
| return self.reduction._build_combine(dshape, antialias, cuda, partitioned, True) | |
| def _build_combine_temps(self, cuda, partitioned): | |
| return self.reduction._build_combine_temps(cuda, partitioned) | |
| def _build_finalize(self, dshape): | |
| cats = list(self.categorizer.categories(dshape)) | |
| def finalize(bases, cuda=False, **kwargs): | |
| # Return a modified copy of kwargs. Cannot modify supplied kwargs as it | |
| # may be used by multiple reductions, e.g. if a summary reduction. | |
| kwargs = copy.deepcopy(kwargs) | |
| kwargs['dims'] += [self.cat_column] | |
| kwargs['coords'][self.cat_column] = cats | |
| return self.reduction._build_finalize(dshape)(bases, cuda=cuda, **kwargs) | |
| return finalize | |
| class any(OptionalFieldReduction): | |
| """Whether any elements in ``column`` map to each bin. | |
| Parameters | |
| ---------- | |
| column : str, optional | |
| If provided, any elements in ``column`` that are ``NaN`` are skipped. | |
| """ | |
| def out_dshape(self, in_dshape, antialias, cuda, partitioned): | |
| return dshape(ct.float32) if antialias else dshape(ct.bool_) | |
| def _antialias_stage_2(self, self_intersect, array_module) -> tuple[AntialiasStage2]: | |
| return (AntialiasStage2(AntialiasCombination.MAX, array_module.nan),) | |
| # CPU append functions | |
| def _append(x, y, agg, field): | |
| if not isnull(field): | |
| agg[y, x] = True | |
| return 0 | |
| return -1 | |
| def _append_antialias(x, y, agg, field, aa_factor, prev_aa_factor): | |
| if not isnull(field): | |
| if isnull(agg[y, x]) or aa_factor > agg[y, x]: | |
| agg[y, x] = aa_factor | |
| return 0 | |
| return -1 | |
| def _append_no_field(x, y, agg): | |
| agg[y, x] = True | |
| return 0 | |
| def _append_no_field_antialias(x, y, agg, aa_factor, prev_aa_factor): | |
| if isnull(agg[y, x]) or aa_factor > agg[y, x]: | |
| agg[y, x] = aa_factor | |
| return 0 | |
| return -1 | |
| # GPU append functions | |
| _append_cuda =_append | |
| _append_no_field_cuda = _append_no_field | |
| def _build_combine(self, dshape, antialias, cuda, partitioned, categorical = False): | |
| if antialias: | |
| return self._combine_antialias | |
| else: | |
| return self._combine | |
| def _combine(aggs): | |
| return aggs.sum(axis=0, dtype='bool') | |
| def _combine_antialias(aggs): | |
| ret = aggs[0] | |
| for i in range(1, len(aggs)): | |
| nanmax_in_place(ret, aggs[i]) | |
| return ret | |
| class _upsample(Reduction): | |
| """"Special internal class used for upsampling""" | |
| def out_dshape(self, in_dshape, antialias, cuda, partitioned): | |
| return dshape(Option(ct.float64)) | |
| def _finalize(bases, cuda=False, **kwargs): | |
| return xr.DataArray(bases[0], **kwargs) | |
| def inputs(self): | |
| return (extract(self.column),) | |
| def _build_create(self, required_dshape): | |
| # Use uninitialized memory, the upsample function must explicitly set unused | |
| # values to nan | |
| return self._create_float64_empty | |
| def _append(x, y, agg, field): | |
| # not called, the upsample function must set agg directly | |
| pass | |
| def _append_cuda(x, y, agg, field): | |
| # not called, the upsample function must set agg directly | |
| pass | |
| def _combine(aggs): | |
| return np.nanmax(aggs, axis=0) | |
| class FloatingReduction(Reduction): | |
| """Base classes for reductions that always have floating-point dtype.""" | |
| def out_dshape(self, in_dshape, antialias, cuda, partitioned): | |
| return dshape(Option(ct.float64)) | |
| def _finalize(bases, cuda=False, **kwargs): | |
| return xr.DataArray(bases[0], **kwargs) | |
| class _sum_zero(FloatingReduction): | |
| """Sum of all elements in ``column``. | |
| Parameters | |
| ---------- | |
| column : str | |
| Name of the column to aggregate over. Column data type must be numeric. | |
| """ | |
| def _antialias_stage_2(self, self_intersect, array_module) -> tuple[AntialiasStage2]: | |
| if self_intersect: | |
| return (AntialiasStage2(AntialiasCombination.SUM_1AGG, 0),) | |
| else: | |
| return (AntialiasStage2(AntialiasCombination.SUM_2AGG, 0),) | |
| def _build_create(self, required_dshape): | |
| return self._create_float64_zero | |
| # CPU append functions. | |
| def _append(x, y, agg, field): | |
| if not isnull(field): | |
| # agg[y, x] cannot be null as initialised to zero. | |
| agg[y, x] += field | |
| return 0 | |
| return -1 | |
| def _append_antialias(x, y, agg, field, aa_factor, prev_aa_factor): | |
| value = field*(aa_factor - prev_aa_factor) | |
| if not isnull(value): | |
| # agg[y, x] cannot be null as initialised to zero. | |
| agg[y, x] += value | |
| return 0 | |
| return -1 | |
| def _append_antialias_not_self_intersect(x, y, agg, field, aa_factor, prev_aa_factor): | |
| value = field*aa_factor | |
| if not isnull(value) and value > agg[y, x]: | |
| # agg[y, x] cannot be null as initialised to zero. | |
| agg[y, x] = value | |
| return 0 | |
| return -1 | |
| # GPU append functions | |
| def _append_cuda(x, y, agg, field): | |
| if not isnull(field): | |
| nb_cuda.atomic.add(agg, (y, x), field) | |
| return 0 | |
| return -1 | |
| def _combine(aggs): | |
| return aggs.sum(axis=0, dtype='f8') | |
| class SelfIntersectingFloatingReduction(FloatingReduction): | |
| """ | |
| Base class for floating reductions for which self-intersecting geometry | |
| may or may not be desirable. | |
| Ignored if not using antialiasing. | |
| """ | |
| def __init__(self, column=None, self_intersect=True): | |
| super().__init__(column) | |
| self.self_intersect = self_intersect | |
| def _antialias_requires_2_stages(self): | |
| return not self.self_intersect | |
| def _build_append(self, dshape, schema, cuda, antialias, self_intersect): | |
| if antialias and not self_intersect: | |
| if cuda: | |
| raise NotImplementedError("SelfIntersectingOptionalFieldReduction") | |
| else: | |
| if self.column is None: | |
| return self._append_no_field_antialias_not_self_intersect | |
| else: | |
| return self._append_antialias_not_self_intersect | |
| return super()._build_append(dshape, schema, cuda, antialias, self_intersect) | |
| def _hashable_inputs(self): | |
| # Reductions with different self_intersect attributes much have different hashes otherwise | |
| # toolz.memoize will treat them as the same to give incorrect results. | |
| return super()._hashable_inputs() + (self.self_intersect,) | |
| class sum(SelfIntersectingFloatingReduction): | |
| """Sum of all elements in ``column``. | |
| Elements of resulting aggregate are nan if they are not updated. | |
| Parameters | |
| ---------- | |
| column : str | |
| Name of the column to aggregate over. Column data type must be numeric. | |
| ``NaN`` values in the column are skipped. | |
| """ | |
| def _antialias_stage_2(self, self_intersect, array_module) -> tuple[AntialiasStage2]: | |
| if self_intersect: | |
| return (AntialiasStage2(AntialiasCombination.SUM_1AGG, array_module.nan),) | |
| else: | |
| return (AntialiasStage2(AntialiasCombination.SUM_2AGG, array_module.nan),) | |
| def _build_bases(self, cuda, partitioned): | |
| if cuda: | |
| return (_sum_zero(self.column), any(self.column)) | |
| else: | |
| return (self,) | |
| # CPU append functions | |
| def _append(x, y, agg, field): | |
| if not isnull(field): | |
| if isnull(agg[y, x]): | |
| agg[y, x] = field | |
| else: | |
| agg[y, x] += field | |
| return 0 | |
| return -1 | |
| def _append_antialias(x, y, agg, field, aa_factor, prev_aa_factor): | |
| value = field*(aa_factor - prev_aa_factor) | |
| if not isnull(value): | |
| if isnull(agg[y, x]): | |
| agg[y, x] = value | |
| else: | |
| agg[y, x] += value | |
| return 0 | |
| return -1 | |
| def _append_antialias_not_self_intersect(x, y, agg, field, aa_factor, prev_aa_factor): | |
| value = field*aa_factor | |
| if not isnull(value): | |
| if isnull(agg[y, x]) or value > agg[y, x]: | |
| agg[y, x] = value | |
| return 0 | |
| return -1 | |
| def _combine(aggs): | |
| return nansum_missing(aggs, axis=0) | |
| def _finalize(bases, cuda=False, **kwargs): | |
| if cuda: | |
| sums, anys = bases | |
| x = np.where(anys, sums, np.nan) | |
| return xr.DataArray(x, **kwargs) | |
| else: | |
| return xr.DataArray(bases[0], **kwargs) | |
| class m2(FloatingReduction): | |
| """Sum of square differences from the mean of all elements in ``column``. | |
| Intermediate value for computing ``var`` and ``std``, not intended to be | |
| used on its own. | |
| Parameters | |
| ---------- | |
| column : str | |
| Name of the column to aggregate over. Column data type must be numeric. | |
| ``NaN`` values in the column are skipped. | |
| """ | |
| def uses_cuda_mutex(self) -> UsesCudaMutex: | |
| return UsesCudaMutex.Global | |
| def _build_append(self, dshape, schema, cuda, antialias, self_intersect): | |
| return super(m2, self)._build_append(dshape, schema, cuda, antialias, self_intersect) | |
| def _build_create(self, required_dshape): | |
| return self._create_float64_zero | |
| def _build_temps(self, cuda=False): | |
| return (_sum_zero(self.column), count(self.column)) | |
| # CPU append functions | |
| def _append(x, y, m2, field, sum, count): | |
| # sum & count are the results of sum[y, x], count[y, x] before being | |
| # updated by field | |
| if not isnull(field): | |
| if count > 0: | |
| u1 = np.float64(sum) / count | |
| u = np.float64(sum + field) / (count + 1) | |
| m2[y, x] += (field - u1) * (field - u) | |
| return 0 | |
| return -1 | |
| # GPU append functions | |
| def _append_cuda(x, y, m2, field, sum, count): | |
| # sum & count are the results of sum[y, x], count[y, x] before being | |
| # updated by field | |
| if not isnull(field): | |
| if count > 0: | |
| u1 = np.float64(sum) / count | |
| u = np.float64(sum + field) / (count + 1) | |
| m2[y, x] += (field - u1) * (field - u) | |
| return 0 | |
| return -1 | |
| def _combine(Ms, sums, ns): | |
| with np.errstate(divide='ignore', invalid='ignore'): | |
| mu = np.nansum(sums, axis=0) / ns.sum(axis=0) | |
| return np.nansum(Ms + ns*(sums/ns - mu)**2, axis=0) | |
| class min(FloatingReduction): | |
| """Minimum value of all elements in ``column``. | |
| Parameters | |
| ---------- | |
| column : str | |
| Name of the column to aggregate over. Column data type must be numeric. | |
| ``NaN`` values in the column are skipped. | |
| """ | |
| def _antialias_requires_2_stages(self): | |
| return True | |
| def _antialias_stage_2(self, self_intersect, array_module) -> tuple[AntialiasStage2]: | |
| return (AntialiasStage2(AntialiasCombination.MIN, array_module.nan),) | |
| # CPU append functions | |
| def _append(x, y, agg, field): | |
| if not isnull(field) and (isnull(agg[y, x]) or agg[y, x] > field): | |
| agg[y, x] = field | |
| return 0 | |
| return -1 | |
| def _append_antialias(x, y, agg, field, aa_factor, prev_aa_factor): | |
| value = field*aa_factor | |
| if not isnull(value) and (isnull(agg[y, x]) or value > agg[y, x]): | |
| agg[y, x] = value | |
| return 0 | |
| return -1 | |
| # GPU append functions | |
| def _append_cuda(x, y, agg, field): | |
| if not isnull(field): | |
| old = cuda_atomic_nanmin(agg, (y, x), field) | |
| if isnull(old) or old > field: | |
| return 0 | |
| return -1 | |
| def _combine(aggs): | |
| return np.nanmin(aggs, axis=0) | |
| class max(FloatingReduction): | |
| """Maximum value of all elements in ``column``. | |
| Parameters | |
| ---------- | |
| column : str | |
| Name of the column to aggregate over. Column data type must be numeric. | |
| ``NaN`` values in the column are skipped. | |
| """ | |
| def _antialias_stage_2(self, self_intersect, array_module) -> tuple[AntialiasStage2]: | |
| return (AntialiasStage2(AntialiasCombination.MAX, array_module.nan),) | |
| # CPU append functions | |
| def _append(x, y, agg, field): | |
| if not isnull(field) and (isnull(agg[y, x]) or agg[y, x] < field): | |
| agg[y, x] = field | |
| return 0 | |
| return -1 | |
| def _append_antialias(x, y, agg, field, aa_factor, prev_aa_factor): | |
| value = field*aa_factor | |
| if not isnull(value) and (isnull(agg[y, x]) or value > agg[y, x]): | |
| agg[y, x] = value | |
| return 0 | |
| return -1 | |
| # GPU append functions | |
| def _append_antialias_cuda(x, y, agg, field, aa_factor, prev_aa_factor): | |
| value = field*aa_factor | |
| if not isnull(value): | |
| old = cuda_atomic_nanmax(agg, (y, x), value) | |
| if isnull(old) or old < value: | |
| return 0 | |
| return -1 | |
| def _append_cuda(x, y, agg, field): | |
| if not isnull(field): | |
| old = cuda_atomic_nanmax(agg, (y, x), field) | |
| if isnull(old) or old < field: | |
| return 0 | |
| return -1 | |
| def _combine(aggs): | |
| return np.nanmax(aggs, axis=0) | |
| class count_cat(by): | |
| """Count of all elements in ``column``, grouped by category. | |
| Alias for `by(...,count())`, for backwards compatibility. | |
| Parameters | |
| ---------- | |
| column : str | |
| Name of the column to aggregate over. Column data type must be | |
| categorical. Resulting aggregate has a outer dimension axis along the | |
| categories present. | |
| """ | |
| def __init__(self, column): | |
| super(count_cat, self).__init__(column, count()) | |
| class mean(Reduction): | |
| """Mean of all elements in ``column``. | |
| Parameters | |
| ---------- | |
| column : str | |
| Name of the column to aggregate over. Column data type must be numeric. | |
| ``NaN`` values in the column are skipped. | |
| """ | |
| def _build_bases(self, cuda, partitioned): | |
| return (_sum_zero(self.column), _count_ignore_antialiasing(self.column)) | |
| def _finalize(bases, cuda=False, **kwargs): | |
| sums, counts = bases | |
| with np.errstate(divide='ignore', invalid='ignore'): | |
| x = np.where(counts > 0, sums/counts, np.nan) | |
| return xr.DataArray(x, **kwargs) | |
| class var(Reduction): | |
| """Variance of all elements in ``column``. | |
| Parameters | |
| ---------- | |
| column : str | |
| Name of the column to aggregate over. Column data type must be numeric. | |
| ``NaN`` values in the column are skipped. | |
| """ | |
| def _build_bases(self, cuda, partitioned): | |
| return (_sum_zero(self.column), count(self.column), m2(self.column)) | |
| def _finalize(bases, cuda=False, **kwargs): | |
| sums, counts, m2s = bases | |
| with np.errstate(divide='ignore', invalid='ignore'): | |
| x = np.where(counts > 0, m2s / counts, np.nan) | |
| return xr.DataArray(x, **kwargs) | |
| class std(Reduction): | |
| """Standard Deviation of all elements in ``column``. | |
| Parameters | |
| ---------- | |
| column : str | |
| Name of the column to aggregate over. Column data type must be numeric. | |
| ``NaN`` values in the column are skipped. | |
| """ | |
| def _build_bases(self, cuda, partitioned): | |
| return (_sum_zero(self.column), count(self.column), m2(self.column)) | |
| def _finalize(bases, cuda=False, **kwargs): | |
| sums, counts, m2s = bases | |
| with np.errstate(divide='ignore', invalid='ignore'): | |
| x = np.where(counts > 0, np.sqrt(m2s / counts), np.nan) | |
| return xr.DataArray(x, **kwargs) | |
| class _first_or_last(Reduction): | |
| """Abstract base class of first and last reductions. | |
| """ | |
| def out_dshape(self, in_dshape, antialias, cuda, partitioned): | |
| return dshape(ct.float64) | |
| def uses_row_index(self, cuda, partitioned): | |
| return cuda or partitioned | |
| def _antialias_requires_2_stages(self): | |
| return True | |
| def _build_bases(self, cuda, partitioned): | |
| if self.uses_row_index(cuda, partitioned): | |
| row_index_selector = self._create_row_index_selector() | |
| wrapper = where(selector=row_index_selector, lookup_column=self.column) | |
| wrapper._nan_check_column = self.column | |
| # where reduction is always preceded by its selector reduction | |
| return row_index_selector._build_bases(cuda, partitioned) + (wrapper,) | |
| else: | |
| return super()._build_bases(cuda, partitioned) | |
| def _combine(aggs): | |
| # Dask combine is handled by a where reduction using a row index. | |
| # Hence this can only ever be called if npartitions == 1 in which case len(aggs) == 1. | |
| if len(aggs) > 1: | |
| raise RuntimeError("_combine should never be called with more than one agg") | |
| return aggs[0] | |
| def _create_row_index_selector(self): | |
| pass | |
| def _finalize(bases, cuda=False, **kwargs): | |
| # Note returning the last of the bases which is correct regardless of whether | |
| # this is a simple reduction (with a single base) or a compound where reduction | |
| # (with 2 bases, the second of which is the where reduction). | |
| return xr.DataArray(bases[-1], **kwargs) | |
| class first(_first_or_last): | |
| """First value encountered in ``column``. | |
| Useful for categorical data where an actual value must always be returned, | |
| not an average or other numerical calculation. | |
| Currently only supported for rasters, externally to this class. | |
| Parameters | |
| ---------- | |
| column : str | |
| Name of the column to aggregate over. If the data type is floating point, | |
| ``NaN`` values in the column are skipped. | |
| """ | |
| def _antialias_stage_2(self, self_intersect, array_module) -> tuple[AntialiasStage2]: | |
| return (AntialiasStage2(AntialiasCombination.FIRST, array_module.nan),) | |
| def _append(x, y, agg, field): | |
| if not isnull(field) and isnull(agg[y, x]): | |
| agg[y, x] = field | |
| return 0 | |
| return -1 | |
| def _append_antialias(x, y, agg, field, aa_factor, prev_aa_factor): | |
| value = field*aa_factor | |
| if not isnull(value) and (isnull(agg[y, x]) or value > agg[y, x]): | |
| agg[y, x] = value | |
| return 0 | |
| return -1 | |
| def _create_row_index_selector(self): | |
| return _min_row_index() | |
| class last(_first_or_last): | |
| """Last value encountered in ``column``. | |
| Useful for categorical data where an actual value must always be returned, | |
| not an average or other numerical calculation. | |
| Currently only supported for rasters, externally to this class. | |
| Parameters | |
| ---------- | |
| column : str | |
| Name of the column to aggregate over. If the data type is floating point, | |
| ``NaN`` values in the column are skipped. | |
| """ | |
| def _antialias_stage_2(self, self_intersect, array_module) -> tuple[AntialiasStage2]: | |
| return (AntialiasStage2(AntialiasCombination.LAST, array_module.nan),) | |
| def _append(x, y, agg, field): | |
| if not isnull(field): | |
| agg[y, x] = field | |
| return 0 | |
| return -1 | |
| def _append_antialias(x, y, agg, field, aa_factor, prev_aa_factor): | |
| value = field*aa_factor | |
| if not isnull(value) and (isnull(agg[y, x]) or value > agg[y, x]): | |
| agg[y, x] = value | |
| return 0 | |
| return -1 | |
| def _create_row_index_selector(self): | |
| return _max_row_index() | |
| class FloatingNReduction(OptionalFieldReduction): | |
| def __init__(self, column=None, n=1): | |
| super().__init__(column) | |
| self.n = n if n >= 1 else 1 | |
| def out_dshape(self, in_dshape, antialias, cuda, partitioned): | |
| return dshape(ct.float64) | |
| def _add_finalize_kwargs(self, **kwargs): | |
| # Add the new dimension and coordinate. | |
| n_name = "n" | |
| n_values = np.arange(self.n) | |
| # Return a modified copy of kwargs. Cannot modify supplied kwargs as it | |
| # may be used by multiple reductions, e.g. if a summary reduction. | |
| kwargs = copy.deepcopy(kwargs) | |
| kwargs['dims'] += [n_name] | |
| kwargs['coords'][n_name] = n_values | |
| return kwargs | |
| def _build_create(self, required_dshape): | |
| return lambda shape, array_module: super(FloatingNReduction, self)._build_create( | |
| required_dshape)(shape + (self.n,), array_module) | |
| def _build_finalize(self, dshape): | |
| def finalize(bases, cuda=False, **kwargs): | |
| kwargs = self._add_finalize_kwargs(**kwargs) | |
| return self._finalize(bases, cuda=cuda, **kwargs) | |
| return finalize | |
| def _hashable_inputs(self): | |
| return super()._hashable_inputs() + (self.n,) | |
| class _first_n_or_last_n(FloatingNReduction): | |
| """Abstract base class of first_n and last_n reductions. | |
| """ | |
| def uses_row_index(self, cuda, partitioned): | |
| return cuda or partitioned | |
| def _antialias_requires_2_stages(self): | |
| return True | |
| def _build_bases(self, cuda, partitioned): | |
| if self.uses_row_index(cuda, partitioned): | |
| row_index_selector = self._create_row_index_selector() | |
| wrapper = where(selector=row_index_selector, lookup_column=self.column) | |
| wrapper._nan_check_column = self.column | |
| # where reduction is always preceded by its selector reduction | |
| return row_index_selector._build_bases(cuda, partitioned) + (wrapper,) | |
| else: | |
| return super()._build_bases(cuda, partitioned) | |
| def _combine(aggs): | |
| # Dask combine is handled by a where reduction using a row index. | |
| # Hence this can only ever be called if npartitions == 1 in which case len(aggs) == 1. | |
| if len(aggs) > 1: | |
| raise RuntimeError("_combine should never be called with more than one agg") | |
| return aggs[0] | |
| def _create_row_index_selector(self): | |
| pass | |
| def _finalize(bases, cuda=False, **kwargs): | |
| # Note returning the last of the bases which is correct regardless of whether | |
| # this is a simple reduction (with a single base) or a compound where reduction | |
| # (with 2 bases, the second of which is the where reduction). | |
| return xr.DataArray(bases[-1], **kwargs) | |
| class first_n(_first_n_or_last_n): | |
| def _antialias_stage_2(self, self_intersect, array_module) -> tuple[AntialiasStage2]: | |
| return (AntialiasStage2(AntialiasCombination.FIRST, array_module.nan, n_reduction=True),) | |
| # CPU append functions | |
| def _append(x, y, agg, field): | |
| if not isnull(field): | |
| # Check final value first for quick abort. | |
| n = agg.shape[2] | |
| if not isnull(agg[y, x, n-1]): | |
| return -1 | |
| # Linear walk along stored values. | |
| # Could do binary search instead but not expecting n to be large. | |
| for i in range(n): | |
| if isnull(agg[y, x, i]): | |
| # Nothing to shift. | |
| agg[y, x, i] = field | |
| return i | |
| return -1 | |
| def _append_antialias(x, y, agg, field, aa_factor, prev_aa_factor): | |
| value = field*aa_factor | |
| if not isnull(value): | |
| # Check final value first for quick abort. | |
| n = agg.shape[2] | |
| if not isnull(agg[y, x, n-1]): | |
| return -1 | |
| # Linear walk along stored values. | |
| # Could do binary search instead but not expecting n to be large. | |
| for i in range(n): | |
| if isnull(agg[y, x, i]): | |
| # Nothing to shift. | |
| agg[y, x, i] = value | |
| return i | |
| return -1 | |
| def _create_row_index_selector(self): | |
| return _min_n_row_index(n=self.n) | |
| class last_n(_first_n_or_last_n): | |
| def _antialias_stage_2(self, self_intersect, array_module) -> tuple[AntialiasStage2]: | |
| return (AntialiasStage2(AntialiasCombination.LAST, array_module.nan, n_reduction=True),) | |
| # CPU append functions | |
| def _append(x, y, agg, field): | |
| if not isnull(field): | |
| # Always inserts at front of agg's third dimension. | |
| shift_and_insert(agg[y, x], field, 0) | |
| return 0 | |
| return -1 | |
| def _append_antialias(x, y, agg, field, aa_factor, prev_aa_factor): | |
| value = field*aa_factor | |
| if not isnull(value): | |
| # Always inserts at front of agg's third dimension. | |
| shift_and_insert(agg[y, x], value, 0) | |
| return 0 | |
| return -1 | |
| def _create_row_index_selector(self): | |
| return _max_n_row_index(n=self.n) | |
| class max_n(FloatingNReduction): | |
| def uses_cuda_mutex(self) -> UsesCudaMutex: | |
| return UsesCudaMutex.Local | |
| def _antialias_stage_2(self, self_intersect, array_module) -> tuple[AntialiasStage2]: | |
| return (AntialiasStage2(AntialiasCombination.MAX, array_module.nan, n_reduction=True),) | |
| # CPU append functions | |
| def _append(x, y, agg, field): | |
| if not isnull(field): | |
| # Linear walk along stored values. | |
| # Could do binary search instead but not expecting n to be large. | |
| n = agg.shape[2] | |
| for i in range(n): | |
| if isnull(agg[y, x, i]) or field > agg[y, x, i]: | |
| shift_and_insert(agg[y, x], field, i) | |
| return i | |
| return -1 | |
| def _append_antialias(x, y, agg, field, aa_factor, prev_aa_factor): | |
| value = field*aa_factor | |
| if not isnull(value): | |
| # Linear walk along stored values. | |
| # Could do binary search instead but not expecting n to be large. | |
| n = agg.shape[2] | |
| for i in range(n): | |
| if isnull(agg[y, x, i]) or value > agg[y, x, i]: | |
| shift_and_insert(agg[y, x], value, i) | |
| return i | |
| return -1 | |
| # GPU append functions | |
| def _append_cuda(x, y, agg, field): | |
| if not isnull(field): | |
| # Linear walk along stored values. | |
| # Could do binary search instead but not expecting n to be large. | |
| n = agg.shape[2] | |
| for i in range(n): | |
| if isnull(agg[y, x, i]) or field > agg[y, x, i]: | |
| cuda_shift_and_insert(agg[y, x], field, i) | |
| return i | |
| return -1 | |
| def _build_combine(self, dshape, antialias, cuda, partitioned, categorical = False): | |
| if cuda: | |
| return self._combine_cuda | |
| else: | |
| return self._combine | |
| def _combine(aggs): | |
| ret = aggs[0] | |
| for i in range(1, len(aggs)): | |
| if ret.ndim == 3: # ndim is either 3 (ny, nx, n) or 4 (ny, nx, ncat, n) | |
| nanmax_n_in_place_3d(aggs[0], aggs[i]) | |
| else: | |
| nanmax_n_in_place_4d(aggs[0], aggs[i]) | |
| return ret | |
| def _combine_cuda(aggs): | |
| ret = aggs[0] | |
| kernel_args = cuda_args(ret.shape[:-1]) | |
| for i in range(1, len(aggs)): | |
| if ret.ndim == 3: # ndim is either 3 (ny, nx, n) or 4 (ny, nx, ncat, n) | |
| cuda_nanmax_n_in_place_3d[kernel_args](aggs[0], aggs[i]) | |
| else: | |
| cuda_nanmax_n_in_place_4d[kernel_args](aggs[0], aggs[i]) | |
| return ret | |
| class min_n(FloatingNReduction): | |
| def uses_cuda_mutex(self) -> UsesCudaMutex: | |
| return UsesCudaMutex.Local | |
| def _antialias_requires_2_stages(self): | |
| return True | |
| def _antialias_stage_2(self, self_intersect, array_module) -> tuple[AntialiasStage2]: | |
| return (AntialiasStage2(AntialiasCombination.MIN, array_module.nan, n_reduction=True),) | |
| # CPU append functions | |
| def _append(x, y, agg, field): | |
| if not isnull(field): | |
| # Linear walk along stored values. | |
| # Could do binary search instead but not expecting n to be large. | |
| n = agg.shape[2] | |
| for i in range(n): | |
| if isnull(agg[y, x, i]) or field < agg[y, x, i]: | |
| shift_and_insert(agg[y, x], field, i) | |
| return i | |
| return -1 | |
| def _append_antialias(x, y, agg, field, aa_factor, prev_aa_factor): | |
| value = field*aa_factor | |
| if not isnull(value): | |
| # Linear walk along stored values. | |
| # Could do binary search instead but not expecting n to be large. | |
| n = agg.shape[2] | |
| for i in range(n): | |
| if isnull(agg[y, x, i]) or value < agg[y, x, i]: | |
| shift_and_insert(agg[y, x], value, i) | |
| return i | |
| return -1 | |
| # GPU append functions | |
| def _append_cuda(x, y, agg, field): | |
| if not isnull(field): | |
| # Linear walk along stored values. | |
| # Could do binary search instead but not expecting n to be large. | |
| n = agg.shape[2] | |
| for i in range(n): | |
| if isnull(agg[y, x, i]) or field < agg[y, x, i]: | |
| cuda_shift_and_insert(agg[y, x], field, i) | |
| return i | |
| return -1 | |
| def _build_combine(self, dshape, antialias, cuda, partitioned, categorical = False): | |
| if cuda: | |
| return self._combine_cuda | |
| else: | |
| return self._combine | |
| def _combine(aggs): | |
| ret = aggs[0] | |
| for i in range(1, len(aggs)): | |
| if ret.ndim == 3: # ndim is either 3 (ny, nx, n) or 4 (ny, nx, ncat, n) | |
| nanmin_n_in_place_3d(aggs[0], aggs[i]) | |
| else: | |
| nanmin_n_in_place_4d(aggs[0], aggs[i]) | |
| return ret | |
| def _combine_cuda(aggs): | |
| ret = aggs[0] | |
| kernel_args = cuda_args(ret.shape[:-1]) | |
| for i in range(1, len(aggs)): | |
| if ret.ndim == 3: # ndim is either 3 (ny, nx, n) or 4 (ny, nx, ncat, n) | |
| cuda_nanmin_n_in_place_3d[kernel_args](aggs[0], aggs[i]) | |
| else: | |
| cuda_nanmin_n_in_place_4d[kernel_args](aggs[0], aggs[i]) | |
| return ret | |
| class mode(Reduction): | |
| """Mode (most common value) of all the values encountered in ``column``. | |
| Useful for categorical data where an actual value must always be returned, | |
| not an average or other numerical calculation. | |
| Currently only supported for rasters, externally to this class. | |
| Implementing it for other glyph types would be difficult due to potentially | |
| unbounded data storage requirements to store indefinite point or line | |
| data per pixel. | |
| Parameters | |
| ---------- | |
| column : str | |
| Name of the column to aggregate over. If the data type is floating point, | |
| ``NaN`` values in the column are skipped. | |
| """ | |
| def out_dshape(self, in_dshape, antialias, cuda, partitioned): | |
| return dshape(Option(ct.float64)) | |
| def _append(x, y, agg): | |
| raise NotImplementedError("mode is currently implemented only for rasters") | |
| def _combine(aggs): | |
| raise NotImplementedError("mode is currently implemented only for rasters") | |
| def _finalize(bases, **kwargs): | |
| raise NotImplementedError("mode is currently implemented only for rasters") | |
| class where(FloatingReduction): | |
| """ | |
| Returns values from a ``lookup_column`` corresponding to a ``selector`` | |
| reduction that is applied to some other column. | |
| If ``lookup_column`` is ``None`` then it uses the index of the row in the | |
| DataFrame instead of a named column. This is returned as an int64 | |
| aggregation with -1 used to denote no value. | |
| Examples | |
| -------- | |
| >>> canvas.line(df, 'x', 'y', agg=ds.where(ds.max("value"), "other")) # doctest: +SKIP | |
| This returns the values of the "other" column that correspond to the | |
| maximum of the "value" column in each bin. | |
| Parameters | |
| ---------- | |
| selector: Reduction | |
| Reduction used to select the values of the ``lookup_column`` which are | |
| returned by this ``where`` reduction. | |
| lookup_column : str | None | |
| Column containing values that are returned from this ``where`` | |
| reduction, or ``None`` to return row indexes instead. | |
| """ | |
| def __init__(self, selector: Reduction, lookup_column: str | None=None): | |
| if not isinstance(selector, (first, first_n, last, last_n, max, max_n, min, min_n, | |
| _max_or_min_row_index, _max_n_or_min_n_row_index)): | |
| raise TypeError( | |
| "selector can only be a first, first_n, last, last_n, " | |
| "max, max_n, min or min_n reduction") | |
| if lookup_column is None: | |
| lookup_column = SpecialColumn.RowIndex | |
| super().__init__(lookup_column) | |
| self.selector = selector | |
| # List of all column names that this reduction uses. | |
| self.columns = (selector.column, lookup_column) | |
| def __hash__(self): | |
| return hash((type(self), self._hashable_inputs(), self.selector)) | |
| def is_where(self): | |
| return True | |
| def out_dshape(self, input_dshape, antialias, cuda, partitioned): | |
| if self.column == SpecialColumn.RowIndex: | |
| return dshape(ct.int64) | |
| else: | |
| return dshape(ct.float64) | |
| def uses_cuda_mutex(self) -> UsesCudaMutex: | |
| return UsesCudaMutex.Local | |
| def uses_row_index(self, cuda, partitioned): | |
| return (self.column == SpecialColumn.RowIndex or | |
| self.selector.uses_row_index(cuda, partitioned)) | |
| def validate(self, in_dshape): | |
| if self.column != SpecialColumn.RowIndex: | |
| super().validate(in_dshape) | |
| self.selector.validate(in_dshape) | |
| if self.column != SpecialColumn.RowIndex and self.column == self.selector.column: | |
| raise ValueError("where and its contained reduction cannot use the same column") | |
| def _antialias_stage_2(self, self_intersect, array_module) -> tuple[AntialiasStage2]: | |
| ret = self.selector._antialias_stage_2(self_intersect, array_module) | |
| if self.column == SpecialColumn.RowIndex: | |
| # Override antialiased zero value when returning integer row index. | |
| ret = (AntialiasStage2(combination=ret[0].combination, | |
| zero=-1, | |
| n_reduction=ret[0].n_reduction),) | |
| return ret | |
| # CPU append functions | |
| # All where._append* functions have an extra argument which is the update index. | |
| # For 3D aggs like max_n, this is the index of insertion in the final dimension, | |
| # and the previous values from this index upwards are shifted along to make room | |
| # for the new value. | |
| def _append(x, y, agg, field, update_index): | |
| if agg.ndim > 2: | |
| shift_and_insert(agg[y, x], field, update_index) | |
| else: | |
| agg[y, x] = field | |
| return update_index | |
| def _append_antialias(x, y, agg, field, aa_factor, prev_aa_factor, update_index): | |
| # Ignore aa_factor. | |
| if agg.ndim > 2: | |
| shift_and_insert(agg[y, x], field, update_index) | |
| else: | |
| agg[y, x] = field | |
| def _append_antialias_cuda(x, y, agg, field, aa_factor, prev_aa_factor, update_index): | |
| # Ignore aa_factor | |
| if agg.ndim > 2: | |
| cuda_shift_and_insert(agg[y, x], field, update_index) | |
| else: | |
| agg[y, x] = field | |
| return update_index | |
| def _append_cuda(x, y, agg, field, update_index): | |
| if agg.ndim > 2: | |
| cuda_shift_and_insert(agg[y, x], field, update_index) | |
| else: | |
| agg[y, x] = field | |
| return update_index | |
| def _build_append(self, dshape, schema, cuda, antialias, self_intersect): | |
| # If self.column is SpecialColumn.RowIndex then append function is passed a | |
| # 'field' argument which is the row index. | |
| if cuda: | |
| if antialias: | |
| return self._append_antialias_cuda | |
| else: | |
| return self._append_cuda | |
| else: | |
| if antialias: | |
| return self._append_antialias | |
| else: | |
| return self._append | |
| def _build_bases(self, cuda, partitioned): | |
| selector = self.selector | |
| if isinstance(selector, (_first_or_last, _first_n_or_last_n)) and \ | |
| selector.uses_row_index(cuda, partitioned): | |
| # Need to swap out the selector with an equivalent row index selector | |
| row_index_selector = selector._create_row_index_selector() | |
| if self.column == SpecialColumn.RowIndex: | |
| # If selector uses a row index and this where returns the same row index, | |
| # can just swap out this where reduction with the row_index_selector. | |
| row_index_selector._nan_check_column = self.selector.column | |
| return row_index_selector._build_bases(cuda, partitioned) | |
| else: | |
| new_where = where(row_index_selector, self.column) | |
| new_where._nan_check_column = self.selector.column | |
| return row_index_selector._build_bases(cuda, partitioned) + \ | |
| new_where._build_bases(cuda, partitioned) | |
| else: | |
| return selector._build_bases(cuda, partitioned) + \ | |
| super()._build_bases(cuda, partitioned) | |
| def _combine_callback(self, cuda, partitioned, categorical): | |
| # Used by: | |
| # 1) where._build_combine()) below, the usual mechanism for combining aggs from | |
| # different dask partitions. | |
| # 2) make_antialias_stage_2_functions() in compiler.py to perform stage 2 combine | |
| # of antialiased aggs. | |
| selector = self.selector | |
| is_n_reduction = isinstance(selector, FloatingNReduction) | |
| if cuda: | |
| append = selector._append_cuda | |
| else: | |
| append = selector._append | |
| # If the selector uses a row_index then selector_aggs will be int64 with -1 | |
| # representing missing data. Otherwise missing data is NaN. | |
| invalid = isminus1 if self.selector.uses_row_index(cuda, partitioned) else isnull | |
| def combine_cpu_2d(aggs, selector_aggs): | |
| ny, nx = aggs[0].shape | |
| for y in range(ny): | |
| for x in range(nx): | |
| value = selector_aggs[1][y, x] | |
| if not invalid(value) and append(x, y, selector_aggs[0], value) >= 0: | |
| aggs[0][y, x] = aggs[1][y, x] | |
| def combine_cpu_3d(aggs, selector_aggs): | |
| ny, nx, ncat = aggs[0].shape | |
| for y in range(ny): | |
| for x in range(nx): | |
| for cat in range(ncat): | |
| value = selector_aggs[1][y, x, cat] | |
| if not invalid(value) and append(x, y, selector_aggs[0][:, :, cat], | |
| value) >= 0: | |
| aggs[0][y, x, cat] = aggs[1][y, x, cat] | |
| def combine_cpu_n_3d(aggs, selector_aggs): | |
| ny, nx, n = aggs[0].shape | |
| for y in range(ny): | |
| for x in range(nx): | |
| for i in range(n): | |
| value = selector_aggs[1][y, x, i] | |
| if invalid(value): | |
| break | |
| update_index = append(x, y, selector_aggs[0], value) | |
| if update_index < 0: | |
| break | |
| shift_and_insert(aggs[0][y, x], aggs[1][y, x, i], update_index) | |
| def combine_cpu_n_4d(aggs, selector_aggs): | |
| ny, nx, ncat, n = aggs[0].shape | |
| for y in range(ny): | |
| for x in range(nx): | |
| for cat in range(ncat): | |
| for i in range(n): | |
| value = selector_aggs[1][y, x, cat, i] | |
| if invalid(value): | |
| break | |
| update_index = append(x, y, selector_aggs[0][:, :, cat, :], value) | |
| if update_index < 0: | |
| break | |
| shift_and_insert(aggs[0][y, x, cat], aggs[1][y, x, cat, i], | |
| update_index) | |
| def combine_cuda_2d(aggs, selector_aggs): | |
| ny, nx = aggs[0].shape | |
| x, y = nb_cuda.grid(2) | |
| if x < nx and y < ny: | |
| value = selector_aggs[1][y, x] | |
| if not invalid(value) and append(x, y, selector_aggs[0], value) >= 0: | |
| aggs[0][y, x] = aggs[1][y, x] | |
| def combine_cuda_3d(aggs, selector_aggs): | |
| ny, nx, ncat = aggs[0].shape | |
| x, y, cat = nb_cuda.grid(3) | |
| if x < nx and y < ny and cat < ncat: | |
| value = selector_aggs[1][y, x, cat] | |
| if not invalid(value) and append(x, y, selector_aggs[0][:, :, cat], value) >= 0: | |
| aggs[0][y, x, cat] = aggs[1][y, x, cat] | |
| def combine_cuda_n_3d(aggs, selector_aggs): | |
| ny, nx, n = aggs[0].shape | |
| x, y = nb_cuda.grid(2) | |
| if x < nx and y < ny: | |
| for i in range(n): | |
| value = selector_aggs[1][y, x, i] | |
| if invalid(value): | |
| break | |
| update_index = append(x, y, selector_aggs[0], value) | |
| if update_index < 0: | |
| break | |
| cuda_shift_and_insert(aggs[0][y, x], aggs[1][y, x, i], update_index) | |
| def combine_cuda_n_4d(aggs, selector_aggs): | |
| ny, nx, ncat, n = aggs[0].shape | |
| x, y, cat = nb_cuda.grid(3) | |
| if x < nx and y < ny and cat < ncat: | |
| for i in range(n): | |
| value = selector_aggs[1][y, x, cat, i] | |
| if invalid(value): | |
| break | |
| update_index = append(x, y, selector_aggs[0][:, :, cat, :], value) | |
| if update_index < 0: | |
| break | |
| cuda_shift_and_insert(aggs[0][y, x, cat], aggs[1][y, x, cat, i], update_index) | |
| if is_n_reduction: | |
| # ndim is either 3 (ny, nx, n) or 4 (ny, nx, ncat, n) | |
| if cuda: | |
| return combine_cuda_n_4d if categorical else combine_cuda_n_3d | |
| else: | |
| return combine_cpu_n_4d if categorical else combine_cpu_n_3d | |
| else: | |
| # ndim is either 2 (ny, nx) or 3 (ny, nx, ncat) | |
| if cuda: | |
| return combine_cuda_3d if categorical else combine_cuda_2d | |
| else: | |
| return combine_cpu_3d if categorical else combine_cpu_2d | |
| def _build_combine(self, dshape, antialias, cuda, partitioned, categorical = False): | |
| combine = self._combine_callback(cuda, partitioned, categorical) | |
| def wrapped_combine(aggs, selector_aggs): | |
| if len(aggs) == 1: | |
| pass | |
| elif cuda: | |
| assert len(aggs) == 2 | |
| is_n_reduction = isinstance(self.selector, FloatingNReduction) | |
| shape = aggs[0].shape[:-1] if is_n_reduction else aggs[0].shape | |
| combine[cuda_args(shape)](aggs, selector_aggs) | |
| else: | |
| for i in range(1, len(aggs)): | |
| combine((aggs[0], aggs[i]), (selector_aggs[0], selector_aggs[i])) | |
| return aggs[0], selector_aggs[0] | |
| return wrapped_combine | |
| def _build_combine_temps(self, cuda, partitioned): | |
| return (self.selector,) | |
| def _build_create(self, required_dshape): | |
| # Return a function that when called with a shape creates an agg array | |
| # of the required type (numpy/cupy) and dtype. | |
| if isinstance(self.selector, FloatingNReduction): | |
| # This specialisation isn't ideal but Reduction classes do not | |
| # store information about the required extra dimension. | |
| return lambda shape, array_module: super(where, self)._build_create( | |
| required_dshape)(shape + (self.selector.n,), array_module) | |
| else: | |
| return super()._build_create(required_dshape) | |
| def _build_finalize(self, dshape): | |
| if isinstance(self.selector, FloatingNReduction): | |
| add_finalize_kwargs = self.selector._add_finalize_kwargs | |
| else: | |
| add_finalize_kwargs = None | |
| def finalize(bases, cuda=False, **kwargs): | |
| if add_finalize_kwargs is not None: | |
| kwargs = add_finalize_kwargs(**kwargs) | |
| return xr.DataArray(bases[-1], **kwargs) | |
| return finalize | |
| class summary(Expr): | |
| """A collection of named reductions. | |
| Computes all aggregates simultaneously, output is stored as a | |
| ``xarray.Dataset``. | |
| Examples | |
| -------- | |
| A reduction for computing the mean of column "a", and the sum of column "b" | |
| for each bin, all in a single pass. | |
| >>> import datashader as ds | |
| >>> red = ds.summary(mean_a=ds.mean('a'), sum_b=ds.sum('b')) | |
| Notes | |
| ----- | |
| A single pass of the source dataset using antialiased lines can either be | |
| performed using a single-stage aggregation (e.g. ``self_intersect=True``) | |
| or two stages (``self_intersect=False``). If a ``summary`` contains a | |
| ``count`` or ``sum`` reduction with ``self_intersect=False``, or any of | |
| ``first``, ``last`` or ``min``, then the antialiased line pass will be | |
| performed in two stages. | |
| """ | |
| def __init__(self, **kwargs): | |
| ks, vs = zip(*sorted(kwargs.items())) | |
| self.keys = ks | |
| self.values = vs | |
| def __hash__(self): | |
| return hash((type(self), tuple(self.keys), tuple(self.values))) | |
| def is_categorical(self): | |
| for v in self.values: | |
| if v.is_categorical(): | |
| return True | |
| return False | |
| def uses_row_index(self, cuda, partitioned): | |
| for v in self.values: | |
| if v.uses_row_index(cuda, partitioned): | |
| return True | |
| return False | |
| def validate(self, input_dshape): | |
| for v in self.values: | |
| v.validate(input_dshape) | |
| # Check that any included FloatingNReductions have the same n values. | |
| n_values = [] | |
| for v in self.values: | |
| if isinstance(v, where): | |
| v = v.selector | |
| if isinstance(v, FloatingNReduction): | |
| n_values.append(v.n) | |
| if len(np.unique(n_values)) > 1: | |
| raise ValueError( | |
| "Using multiple FloatingNReductions with different n values is not supported") | |
| def inputs(self): | |
| return tuple(unique(concat(v.inputs for v in self.values))) | |
| class _max_or_min_row_index(OptionalFieldReduction): | |
| """Abstract base class of max and min row_index reductions. | |
| """ | |
| def __init__(self): | |
| super().__init__(column=SpecialColumn.RowIndex) | |
| def out_dshape(self, in_dshape, antialias, cuda, partitioned): | |
| return dshape(ct.int64) | |
| def uses_row_index(self, cuda, partitioned): | |
| return True | |
| class _max_row_index(_max_or_min_row_index): | |
| """Max reduction operating on row index. | |
| This is a private class as it is not intended to be used explicitly in | |
| user code. It is primarily purpose is to support the use of ``last`` | |
| reductions using dask and/or CUDA. | |
| """ | |
| def _antialias_stage_2(self, self_intersect, array_module) -> tuple[AntialiasStage2]: | |
| return (AntialiasStage2(AntialiasCombination.MAX, -1),) | |
| def _append(x, y, agg, field): | |
| # field is int64 row index | |
| if field > agg[y, x]: | |
| agg[y, x] = field | |
| return 0 | |
| return -1 | |
| def _append_antialias(x, y, agg, field, aa_factor, prev_aa_factor): | |
| # field is int64 row index | |
| # Ignore aa_factor | |
| if field > agg[y, x]: | |
| agg[y, x] = field | |
| return 0 | |
| return -1 | |
| # GPU append functions | |
| def _append_cuda(x, y, agg, field): | |
| # field is int64 row index | |
| if field != -1: | |
| old = nb_cuda.atomic.max(agg, (y, x), field) | |
| if old < field: | |
| return 0 | |
| return -1 | |
| def _combine(aggs): | |
| # Maximum ignoring -1 values | |
| # Works for CPU and GPU | |
| ret = aggs[0] | |
| for i in range(1, len(aggs)): | |
| # Works with numpy or cupy arrays | |
| np.maximum(ret, aggs[i], out=ret) | |
| return ret | |
| class _min_row_index(_max_or_min_row_index): | |
| """Min reduction operating on row index. | |
| This is a private class as it is not intended to be used explicitly in | |
| user code. It is primarily purpose is to support the use of ``first`` | |
| reductions using dask and/or CUDA. | |
| """ | |
| def _antialias_requires_2_stages(self): | |
| return True | |
| def _antialias_stage_2(self, self_intersect, array_module) -> tuple[AntialiasStage2]: | |
| return (AntialiasStage2(AntialiasCombination.MIN, -1),) | |
| def uses_cuda_mutex(self) -> UsesCudaMutex: | |
| return UsesCudaMutex.Local | |
| # CPU append functions | |
| def _append(x, y, agg, field): | |
| # field is int64 row index | |
| if field != -1 and (agg[y, x] == -1 or field < agg[y, x]): | |
| agg[y, x] = field | |
| return 0 | |
| return -1 | |
| def _append_antialias(x, y, agg, field, aa_factor, prev_aa_factor): | |
| # field is int64 row index | |
| # Ignore aa_factor | |
| if field != -1 and (agg[y, x] == -1 or field < agg[y, x]): | |
| agg[y, x] = field | |
| return 0 | |
| return -1 | |
| # GPU append functions | |
| def _append_cuda(x, y, agg, field): | |
| # field is int64 row index | |
| # Always uses cuda mutex so this does not need to be atomic | |
| if field != -1 and (agg[y, x] == -1 or field < agg[y, x]): | |
| agg[y, x] = field | |
| return 0 | |
| return -1 | |
| def _build_combine(self, dshape, antialias, cuda, partitioned, categorical = False): | |
| if cuda: | |
| return self._combine_cuda | |
| else: | |
| return self._combine | |
| def _combine(aggs): | |
| # Minimum ignoring -1 values | |
| ret = aggs[0] | |
| for i in range(1, len(aggs)): | |
| # Can take 2d (ny, nx) or 3d (ny, nx, ncat) arrays. | |
| row_min_in_place(ret, aggs[i]) | |
| return ret | |
| def _combine_cuda(aggs): | |
| ret = aggs[0] | |
| if len(aggs) > 1: | |
| if ret.ndim == 2: # ndim is either 2 (ny, nx) or 3 (ny, nx, ncat) | |
| # 3d view of each agg | |
| aggs = [cp.expand_dims(agg, 2) for agg in aggs] | |
| kernel_args = cuda_args(ret.shape[:3]) | |
| for i in range(1, len(aggs)): | |
| cuda_row_min_in_place[kernel_args](aggs[0], aggs[i]) | |
| return ret | |
| class _max_n_or_min_n_row_index(FloatingNReduction): | |
| """Abstract base class of max_n and min_n row_index reductions. | |
| """ | |
| def __init__(self, n=1): | |
| super().__init__(column=SpecialColumn.RowIndex) | |
| self.n = n if n >= 1 else 1 | |
| def out_dshape(self, in_dshape, antialias, cuda, partitioned): | |
| return dshape(ct.int64) | |
| def uses_cuda_mutex(self) -> UsesCudaMutex: | |
| return UsesCudaMutex.Local | |
| def uses_row_index(self, cuda, partitioned): | |
| return True | |
| def _build_combine(self, dshape, antialias, cuda, partitioned, categorical = False): | |
| if cuda: | |
| return self._combine_cuda | |
| else: | |
| return self._combine | |
| class _max_n_row_index(_max_n_or_min_n_row_index): | |
| """Max_n reduction operating on row index. | |
| This is a private class as it is not intended to be used explicitly in | |
| user code. It is primarily purpose is to support the use of ``last_n`` | |
| reductions using dask and/or CUDA. | |
| """ | |
| def _antialias_stage_2(self, self_intersect, array_module) -> tuple[AntialiasStage2]: | |
| return (AntialiasStage2(AntialiasCombination.MAX, -1, n_reduction=True),) | |
| def _append(x, y, agg, field): | |
| # field is int64 row index | |
| if field != -1: | |
| # Linear walk along stored values. | |
| # Could do binary search instead but not expecting n to be large. | |
| n = agg.shape[2] | |
| for i in range(n): | |
| if agg[y, x, i] == -1 or field > agg[y, x, i]: | |
| shift_and_insert(agg[y, x], field, i) | |
| return i | |
| return -1 | |
| def _append_antialias(x, y, agg, field, aa_factor, prev_aa_factor): | |
| # field is int64 row index | |
| # Ignoring aa_factor | |
| if field != -1: | |
| # Linear walk along stored values. | |
| # Could do binary search instead but not expecting n to be large. | |
| n = agg.shape[2] | |
| for i in range(n): | |
| if agg[y, x, i] == -1 or field > agg[y, x, i]: | |
| # Bump previous values along to make room for new value. | |
| for j in range(n-1, i, -1): | |
| agg[y, x, j] = agg[y, x, j-1] | |
| agg[y, x, i] = field | |
| return i | |
| return -1 | |
| # GPU append functions | |
| def _append_cuda(x, y, agg, field): | |
| # field is int64 row index | |
| # Always uses cuda mutex so this does not need to be atomic | |
| if field != -1: | |
| # Linear walk along stored values. | |
| # Could do binary search instead but not expecting n to be large. | |
| n = agg.shape[2] | |
| for i in range(n): | |
| if agg[y, x, i] == -1 or field > agg[y, x, i]: | |
| cuda_shift_and_insert(agg[y, x], field, i) | |
| return i | |
| return -1 | |
| def _combine(aggs): | |
| ret = aggs[0] | |
| if len(aggs) > 1: | |
| if ret.ndim == 3: # ndim is either 3 (ny, nx, n) or 4 (ny, nx, ncat, n) | |
| row_max_n_in_place_3d(aggs[0], aggs[1]) | |
| else: | |
| row_max_n_in_place_4d(aggs[0], aggs[1]) | |
| return ret | |
| def _combine_cuda(aggs): | |
| ret = aggs[0] | |
| if len(aggs) > 1: | |
| kernel_args = cuda_args(ret.shape[:-1]) | |
| if ret.ndim == 3: # ndim is either 3 (ny, nx, n) or 4 (ny, nx, ncat, n) | |
| cuda_row_max_n_in_place_3d[kernel_args](aggs[0], aggs[1]) | |
| else: | |
| cuda_row_max_n_in_place_4d[kernel_args](aggs[0], aggs[1]) | |
| return ret | |
| class _min_n_row_index(_max_n_or_min_n_row_index): | |
| """Min_n reduction operating on row index. | |
| This is a private class as it is not intended to be used explicitly in | |
| user code. It is primarily purpose is to support the use of ``first_n`` | |
| reductions using dask and/or CUDA. | |
| """ | |
| def _antialias_requires_2_stages(self): | |
| return True | |
| def _antialias_stage_2(self, self_intersect, array_module) -> tuple[AntialiasStage2]: | |
| return (AntialiasStage2(AntialiasCombination.MIN, -1, n_reduction=True),) | |
| def _append(x, y, agg, field): | |
| # field is int64 row index | |
| if field != -1: | |
| # Linear walk along stored values. | |
| # Could do binary search instead but not expecting n to be large. | |
| n = agg.shape[2] | |
| for i in range(n): | |
| if agg[y, x, i] == -1 or field < agg[y, x, i]: | |
| shift_and_insert(agg[y, x], field, i) | |
| return i | |
| return -1 | |
| def _append_antialias(x, y, agg, field, aa_factor, prev_aa_factor): | |
| # field is int64 row index | |
| # Ignoring aa_factor | |
| if field != -1: | |
| # Linear walk along stored values. | |
| # Could do binary search instead but not expecting n to be large. | |
| n = agg.shape[2] | |
| for i in range(n): | |
| if agg[y, x, i] == -1 or field < agg[y, x, i]: | |
| shift_and_insert(agg[y, x], field, i) | |
| return i | |
| return -1 | |
| def _append_cuda(x, y, agg, field): | |
| # field is int64 row index | |
| # Always uses cuda mutex so this does not need to be atomic | |
| if field != -1: | |
| # Linear walk along stored values. | |
| # Could do binary search instead but not expecting n to be large. | |
| n = agg.shape[2] | |
| for i in range(n): | |
| if agg[y, x, i] == -1 or field < agg[y, x, i]: | |
| cuda_shift_and_insert(agg[y, x], field, i) | |
| return i | |
| return -1 | |
| def _combine(aggs): | |
| ret = aggs[0] | |
| if len(aggs) > 1: | |
| if ret.ndim == 3: # ndim is either 3 (ny, nx, n) or 4 (ny, nx, ncat, n) | |
| row_min_n_in_place_3d(aggs[0], aggs[1]) | |
| else: | |
| row_min_n_in_place_4d(aggs[0], aggs[1]) | |
| return ret | |
| def _combine_cuda(aggs): | |
| ret = aggs[0] | |
| if len(aggs) > 1: | |
| kernel_args = cuda_args(ret.shape[:-1]) | |
| if ret.ndim == 3: # ndim is either 3 (ny, nx, n) or 4 (ny, nx, ncat, n) | |
| cuda_row_min_n_in_place_3d[kernel_args](aggs[0], aggs[1]) | |
| else: | |
| cuda_row_min_n_in_place_4d[kernel_args](aggs[0], aggs[1]) | |
| return ret | |
| __all__ = list(set([_k for _k,_v in locals().items() | |
| if isinstance(_v,type) and (issubclass(_v,Reduction) or _v is summary) | |
| and _v not in [Reduction, OptionalFieldReduction, | |
| FloatingReduction, m2]])) + \ | |
| ['category_modulo', 'category_binning'] | |