Spaces:
Running
on
Zero
Running
on
Zero
| from __future__ import annotations | |
| from itertools import count | |
| import logging | |
| from typing import TYPE_CHECKING | |
| from toolz import unique, concat, pluck, get, memoize | |
| from numba import literal_unroll | |
| import numpy as np | |
| import xarray as xr | |
| from .antialias import AntialiasCombination | |
| from .reductions import SpecialColumn, UsesCudaMutex, by, category_codes, summary | |
| from .utils import (isnull, ngjit, | |
| nanmax_in_place, nanmin_in_place, nansum_in_place, nanfirst_in_place, nanlast_in_place, | |
| nanmax_n_in_place_3d, nanmax_n_in_place_4d, nanmin_n_in_place_3d, nanmin_n_in_place_4d, | |
| nanfirst_n_in_place_3d, nanfirst_n_in_place_4d, nanlast_n_in_place_3d, nanlast_n_in_place_4d, | |
| row_min_in_place, row_min_n_in_place_3d, row_min_n_in_place_4d, | |
| row_max_in_place, row_max_n_in_place_3d, row_max_n_in_place_4d, | |
| ) | |
| try: | |
| from datashader.transfer_functions._cuda_utils import cuda_mutex_lock, cuda_mutex_unlock | |
| except ImportError: | |
| cuda_mutex_lock, cuda_mutex_unlock = None, None | |
| if TYPE_CHECKING: | |
| from datashader.antialias import UnzippedAntialiasStage2 | |
| __all__ = ['compile_components'] | |
| logger = logging.getLogger(__name__) | |
| def compile_components(agg, schema, glyph, *, antialias=False, cuda=False, partitioned=False): | |
| """Given an ``Aggregation`` object and a schema, return 5 sub-functions | |
| and information on how to perform the second stage aggregation if | |
| antialiasing is requested, | |
| Parameters | |
| ---------- | |
| agg : Aggregation | |
| The expression describing the aggregation(s) to be computed. | |
| schema : DataShape | |
| Columns and dtypes in the source dataset. | |
| glyph : Glyph | |
| The glyph to render. | |
| antialias : bool | |
| Whether to render using antialiasing. | |
| cuda : bool | |
| Whether to render using CUDA (on the GPU) or CPU. | |
| partitioned : bool | |
| Whether the source dataset is partitioned using dask. | |
| Returns | |
| ------- | |
| A tuple of the following: | |
| ``create(shape)`` | |
| Function that takes the aggregate shape, and returns a tuple of | |
| initialized numpy arrays. | |
| ``info(df, canvas_shape)`` | |
| Function that takes a dataframe, and returns preprocessed 1D numpy | |
| arrays of the needed columns. | |
| ``append(i, x, y, *aggs_and_cols)`` | |
| Function that appends the ``i``th row of the table to the ``(x, y)`` | |
| bin, given the base arrays and columns in ``aggs_and_cols``. This does | |
| the bulk of the work. | |
| ``combine(base_tuples)`` | |
| Function that combines a list of base tuples into a single base tuple. | |
| This forms the reducing step in a reduction tree. | |
| ``finalize(aggs, cuda)`` | |
| Function that is given a tuple of base numpy arrays and returns the | |
| finalized ``DataArray`` or ``Dataset``. | |
| ``antialias_stage_2`` | |
| If using antialiased lines this is a tuple of the ``AntialiasCombination`` | |
| values corresponding to the aggs. If not using antialiased lines then | |
| this is ``False``. | |
| ``antialias_stage_2_funcs`` | |
| If using antialiased lines which require a second stage combine, this | |
| is a tuple of the three combine functions which are the accumulate, | |
| clear and copy_back functions. If not using antialiased lines then this | |
| is ``None``. | |
| ``column_names`` | |
| Names of DataFrame columns or DataArray variables that are used by the | |
| agg. | |
| """ | |
| reds = list(traverse_aggregation(agg)) | |
| # List of base reductions (actually computed) | |
| bases = list(unique(concat(r._build_bases(cuda, partitioned) for r in reds))) | |
| dshapes = [b.out_dshape(schema, antialias, cuda, partitioned) for b in bases] | |
| # Information on how to perform second stage aggregation of antialiased lines, | |
| # including whether antialiased lines self-intersect or not as we need a single | |
| # value for this even for a compound reduction. This is by default True, but | |
| # is False if a single constituent reduction requests it. | |
| if antialias: | |
| self_intersect, antialias_stage_2 = make_antialias_stage_2(reds, bases) | |
| if cuda: | |
| import cupy | |
| array_module = cupy | |
| else: | |
| array_module = np | |
| antialias_stage_2 = antialias_stage_2(array_module) | |
| antialias_stage_2_funcs = make_antialias_stage_2_functions(antialias_stage_2, bases, cuda, | |
| partitioned) | |
| else: | |
| self_intersect = False | |
| antialias_stage_2 = False | |
| antialias_stage_2_funcs = None | |
| # List of tuples of | |
| # (append, base, input columns, temps, combine temps, uses cuda mutex, is_categorical) | |
| calls = [_get_call_tuples(b, d, schema, cuda, antialias, self_intersect, partitioned) | |
| for (b, d) in zip(bases, dshapes)] | |
| # List of unique column names needed, including nan_check_columns | |
| cols = list(concat(pluck(2, calls))) | |
| nan_check_cols = list(c[3] for c in calls if c[3] is not None) | |
| cols = list(unique(cols + nan_check_cols)) | |
| # List of temps needed | |
| temps = list(pluck(4, calls)) | |
| combine_temps = list(pluck(5, calls)) | |
| create = make_create(bases, dshapes, cuda) | |
| append, any_uses_cuda_mutex = make_append(bases, cols, calls, glyph, antialias) | |
| info = make_info(cols, cuda, any_uses_cuda_mutex) | |
| combine = make_combine(bases, dshapes, temps, combine_temps, antialias, cuda, partitioned) | |
| finalize = make_finalize(bases, agg, schema, cuda, partitioned) | |
| column_names = [c.column for c in cols if c.column != SpecialColumn.RowIndex] | |
| return create, info, append, combine, finalize, antialias_stage_2, antialias_stage_2_funcs, \ | |
| column_names | |
| def _get_antialias_stage_2_combine_func(combination: AntialiasCombination, zero: float, | |
| n_reduction: bool, categorical: bool): | |
| if n_reduction: | |
| if zero == -1: | |
| if combination in (AntialiasCombination.MAX, AntialiasCombination.LAST): | |
| return row_max_n_in_place_4d if categorical else row_max_n_in_place_3d | |
| elif combination in (AntialiasCombination.MIN, AntialiasCombination.FIRST): | |
| return row_min_n_in_place_4d if categorical else row_min_n_in_place_3d | |
| else: | |
| raise NotImplementedError | |
| else: | |
| if combination == AntialiasCombination.MAX: | |
| return nanmax_n_in_place_4d if categorical else nanmax_n_in_place_3d | |
| elif combination == AntialiasCombination.MIN: | |
| return nanmin_n_in_place_4d if categorical else nanmin_n_in_place_3d | |
| elif combination == AntialiasCombination.FIRST: | |
| return nanfirst_n_in_place_4d if categorical else nanfirst_n_in_place_3d | |
| elif combination == AntialiasCombination.LAST: | |
| return nanlast_n_in_place_4d if categorical else nanlast_n_in_place_3d | |
| else: | |
| raise NotImplementedError | |
| else: | |
| # The aggs to combine here are either 3D (ny, nx, ncat) if categorical is True or | |
| # 2D (ny, nx) if categorical is False. The same combination functions can be for both | |
| # as all elements are independent. | |
| if zero == -1: | |
| if combination in (AntialiasCombination.MAX, AntialiasCombination.LAST): | |
| return row_max_in_place | |
| elif combination in (AntialiasCombination.MIN, AntialiasCombination.FIRST): | |
| return row_min_in_place | |
| else: | |
| raise NotImplementedError | |
| else: | |
| if combination == AntialiasCombination.MAX: | |
| return nanmax_in_place | |
| elif combination == AntialiasCombination.MIN: | |
| return nanmin_in_place | |
| elif combination == AntialiasCombination.FIRST: | |
| return nanfirst_in_place | |
| elif combination == AntialiasCombination.LAST: | |
| return nanlast_in_place | |
| else: | |
| return nansum_in_place | |
| def make_antialias_stage_2_functions(antialias_stage_2, bases, cuda, partitioned): | |
| aa_combinations, aa_zeroes, aa_n_reductions, aa_categorical = antialias_stage_2 | |
| # Accumulate functions. | |
| funcs = [_get_antialias_stage_2_combine_func(comb, zero, n_red, cat) for comb, zero, n_red, cat | |
| in zip(aa_combinations, aa_zeroes, aa_n_reductions, aa_categorical)] | |
| base_is_where = [b.is_where() for b in bases] | |
| next_base_is_where = base_is_where[1:] + [False] | |
| namespace = {} | |
| namespace["literal_unroll"] = literal_unroll | |
| for func in set(funcs): | |
| namespace[func.__name__] = func | |
| # Generator of unique names for combine functions | |
| names = (f"combine{i}" for i in count()) | |
| # aa_stage_2_accumulate | |
| lines = [ | |
| "def aa_stage_2_accumulate(aggs_and_copies, first_pass):", | |
| # Don't need to accumulate if first_pass, just copy (opposite of aa_stage_2_copy_back) | |
| " if first_pass:", | |
| " for a in literal_unroll(aggs_and_copies):", | |
| " a[1][:] = a[0][:]", | |
| " else:", | |
| ] | |
| for i, (func, is_where, next_is_where) in enumerate(zip(funcs, base_is_where, | |
| next_base_is_where)): | |
| if is_where: | |
| where_reduction = bases[i] | |
| if isinstance(where_reduction, by): | |
| where_reduction = where_reduction.reduction | |
| combine = where_reduction._combine_callback(cuda, partitioned, aa_categorical[i]) | |
| name = next(names) # Unique name | |
| namespace[name] = combine | |
| lines.append( | |
| f" {name}(aggs_and_copies[{i}][::-1], aggs_and_copies[{i-1}][::-1])") | |
| elif next_is_where: | |
| # This is dealt with as part of the following base which is a where reduction. | |
| pass | |
| else: | |
| lines.append( | |
| f" {func.__name__}(aggs_and_copies[{i}][1], aggs_and_copies[{i}][0])") | |
| code = "\n".join(lines) | |
| logger.debug(code) | |
| exec(code, namespace) | |
| aa_stage_2_accumulate = ngjit(namespace["aa_stage_2_accumulate"]) | |
| # aa_stage_2_clear | |
| if np.any(np.isnan(aa_zeroes)): | |
| namespace["nan"] = np.nan | |
| lines = ["def aa_stage_2_clear(aggs_and_copies):"] | |
| for i, aa_zero in enumerate(aa_zeroes): | |
| lines.append(f" aggs_and_copies[{i}][0].fill({aa_zero})") | |
| code = "\n".join(lines) | |
| logger.debug(code) | |
| exec(code, namespace) | |
| aa_stage_2_clear = ngjit(namespace["aa_stage_2_clear"]) | |
| # aa_stage_2_copy_back | |
| def aa_stage_2_copy_back(aggs_and_copies): | |
| # Numba access to heterogeneous tuples is only permitted using literal_unroll. | |
| for agg_and_copy in literal_unroll(aggs_and_copies): | |
| agg_and_copy[0][:] = agg_and_copy[1][:] | |
| return aa_stage_2_accumulate, aa_stage_2_clear, aa_stage_2_copy_back | |
| def traverse_aggregation(agg): | |
| """Yield a left->right traversal of an aggregation""" | |
| if isinstance(agg, summary): | |
| for a in agg.values: | |
| for a2 in traverse_aggregation(a): | |
| yield a2 | |
| else: | |
| yield agg | |
| def _get_call_tuples(base, dshape, schema, cuda, antialias, self_intersect, partitioned): | |
| # Comments refer to usage in make_append() | |
| return ( | |
| base._build_append(dshape, schema, cuda, antialias, self_intersect), # func | |
| (base,), # bases | |
| base.inputs, # cols, arrays of these are passed to reduction append functions | |
| base.nan_check_column, # column used to check for NaNs in some where reductions | |
| base._build_temps(cuda), # temps | |
| base._build_combine_temps(cuda, partitioned), # combine temps | |
| base.uses_cuda_mutex() if cuda else UsesCudaMutex.No, # uses cuda mutex | |
| base.is_categorical(), | |
| ) | |
| def make_create(bases, dshapes, cuda): | |
| creators = [b._build_create(d) for (b, d) in zip(bases, dshapes)] | |
| if cuda: | |
| import cupy | |
| array_module = cupy | |
| else: | |
| array_module = np | |
| return lambda shape: tuple(c(shape, array_module) for c in creators) | |
| def make_info(cols, cuda, uses_cuda_mutex: bool): | |
| def info(df, canvas_shape): | |
| ret = tuple(c.apply(df, cuda) for c in cols) | |
| if uses_cuda_mutex: | |
| import cupy # Guaranteed to be available if uses_cuda_mutex is True | |
| import numba | |
| from packaging.version import Version | |
| if Version(numba.__version__) >= Version("0.57"): | |
| mutex_array = cupy.zeros(canvas_shape, dtype=np.uint32) | |
| else: | |
| mutex_array = cupy.zeros((1,), dtype=np.uint32) | |
| ret += (mutex_array,) | |
| return ret | |
| return info | |
| def make_append(bases, cols, calls, glyph, antialias): | |
| names = ('_{0}'.format(i) for i in count()) | |
| inputs = list(bases) + list(cols) | |
| namespace = {} | |
| need_isnull = any(call[3] for call in calls) | |
| if need_isnull: | |
| namespace["isnull"] = isnull | |
| global_cuda_mutex = any(call[6] == UsesCudaMutex.Global for call in calls) | |
| any_uses_cuda_mutex = any(call[6] != UsesCudaMutex.No for call in calls) | |
| if any_uses_cuda_mutex: | |
| # This adds an argument to the append() function that is the cuda mutex | |
| # generated in make_info. | |
| inputs += ["_cuda_mutex"] | |
| namespace["cuda_mutex_lock"] = cuda_mutex_lock | |
| namespace["cuda_mutex_unlock"] = cuda_mutex_unlock | |
| signature = [next(names) for i in inputs] | |
| arg_lk = dict(zip(inputs, signature)) | |
| local_lk = {} | |
| head = [] | |
| body = [] | |
| ndims = glyph.ndims | |
| if ndims is not None: | |
| subscript = ', '.join(['i' + str(n) for n in range(ndims)]) | |
| else: | |
| subscript = None | |
| prev_local_cuda_mutex = False | |
| categorical_args = {} # Reuse categorical arguments if used in more than one reduction | |
| where_selectors = {} # Reuse where.selector if used more than once in a summary reduction | |
| if logger.isEnabledFor(logging.DEBUG): # mostly does nothing... | |
| logger.debug(f"global_cuda_mutex {global_cuda_mutex}") | |
| logger.debug(f"any_uses_cuda_mutex {any_uses_cuda_mutex}") | |
| for k, v in arg_lk.items(): | |
| logger.debug(f"arg_lk {v} {k} {getattr(k, 'column', None)}") | |
| def get_cuda_mutex_call(lock: bool) -> str: | |
| func = "cuda_mutex_lock" if lock else "cuda_mutex_unlock" | |
| return f'{func}({arg_lk["_cuda_mutex"]}, (y, x))' | |
| for index, (func, bases, cols, nan_check_column, temps, _, uses_cuda_mutex, categorical) \ | |
| in enumerate(calls): | |
| local_cuda_mutex = not global_cuda_mutex and uses_cuda_mutex == UsesCudaMutex.Local | |
| local_lk.update(zip(temps, (next(names) for i in temps))) | |
| func_name = next(names) | |
| logger.debug(f"func {func_name} {func}") | |
| namespace[func_name] = func | |
| args = [arg_lk[i] for i in bases] | |
| if categorical and isinstance(cols[0], category_codes): | |
| args.extend('{0}[{1}]'.format(arg_lk[col], subscript) for col in cols[1:]) | |
| elif ndims is None: | |
| args.extend('{0}'.format(arg_lk[i]) for i in cols) | |
| elif categorical: | |
| args.extend('{0}[{1}][1]'.format(arg_lk[i], subscript) | |
| for i in cols) | |
| else: | |
| args.extend('{0}[{1}]'.format(arg_lk[i], subscript) | |
| for i in cols) | |
| if categorical: | |
| # Categorical aggregate arrays need to be unpacked | |
| categorical_arg = arg_lk[cols[0]] | |
| cat_name = categorical_args.get(categorical_arg, None) | |
| if cat_name is None: | |
| # Each categorical column only needs to be unpacked once | |
| col_index = '' if isinstance(cols[0], category_codes) else '[0]' | |
| cat_name = f'cat{next(names)}' | |
| categorical_args[categorical_arg] = cat_name | |
| head.append(f'{cat_name} = int({categorical_arg}[{subscript}]{col_index})') | |
| arg = signature[index] | |
| head.append(f'{arg} = {arg}[:, :, {cat_name}]') | |
| args.extend([local_lk[i] for i in temps]) | |
| if antialias: | |
| args += ["aa_factor", "prev_aa_factor"] | |
| if local_cuda_mutex and prev_local_cuda_mutex: | |
| # Avoid unnecessary mutex unlock and lock cycle | |
| body.pop() | |
| is_where = len(bases) == 1 and bases[0].is_where() | |
| if is_where: | |
| where_reduction = bases[0] | |
| if isinstance(where_reduction, by): | |
| where_reduction = where_reduction.reduction | |
| selector_hash = hash(where_reduction.selector) | |
| update_index_arg_name = where_selectors.get(selector_hash, None) | |
| new_selector = update_index_arg_name is None | |
| if new_selector: | |
| update_index_arg_name = next(names) | |
| where_selectors[selector_hash] = update_index_arg_name | |
| args.append(update_index_arg_name) | |
| # where reduction needs access to the return of the contained | |
| # reduction, which is the preceding one here. | |
| prev_body = body.pop() | |
| if local_cuda_mutex and not prev_local_cuda_mutex: | |
| body.append(get_cuda_mutex_call(True)) | |
| if new_selector: | |
| body.append(f'{update_index_arg_name} = {prev_body}') | |
| else: | |
| body.append(prev_body) | |
| # If nan_check_column is defined then need to check if value of | |
| # correct row in that column is NaN and if so do nothing. This | |
| # check needs to occur before the where.selector is called. | |
| if nan_check_column is None: | |
| whitespace = '' | |
| else: | |
| var = f"{arg_lk[nan_check_column]}[{subscript}]" | |
| prev_body = body[-1] | |
| body[-1] = f'if not isnull({var}):' | |
| body.append(f' {prev_body}') | |
| whitespace = ' ' | |
| body.append(f'{whitespace}if {update_index_arg_name} >= 0:') | |
| body.append(f' {whitespace}{func_name}(x, y, {", ".join(args)})') | |
| else: | |
| if local_cuda_mutex and not prev_local_cuda_mutex: | |
| body.append(get_cuda_mutex_call(True)) | |
| if nan_check_column: | |
| var = f"{arg_lk[nan_check_column]}[{subscript}]" | |
| body.append(f'if not isnull({var}):') | |
| body.append(f' {func_name}(x, y, {", ".join(args)})') | |
| else: | |
| body.append(f'{func_name}(x, y, {", ".join(args)})') | |
| if local_cuda_mutex: | |
| body.append(get_cuda_mutex_call(False)) | |
| prev_local_cuda_mutex = local_cuda_mutex | |
| body = head + ['{0} = {1}[y, x]'.format(name, arg_lk[agg]) | |
| for agg, name in local_lk.items()] + body | |
| if global_cuda_mutex: | |
| body = [get_cuda_mutex_call(True)] + body + [get_cuda_mutex_call(False)] | |
| if antialias: | |
| signature = ["aa_factor", "prev_aa_factor"] + signature | |
| if ndims is None: | |
| code = ('def append(x, y, {0}):\n' | |
| ' {1}').format(', '.join(signature), '\n '.join(body)) | |
| else: | |
| code = ('def append({0}, x, y, {1}):\n' | |
| ' {2}' | |
| ).format(subscript, ', '.join(signature), '\n '.join(body)) | |
| logger.debug(code) | |
| exec(code, namespace) | |
| return ngjit(namespace['append']), any_uses_cuda_mutex | |
| def make_combine(bases, dshapes, temps, combine_temps, antialias, cuda, partitioned): | |
| # Lookup of base Reduction to argument index. | |
| arg_lk = dict((k, v) for (v, k) in enumerate(bases)) | |
| # Also need lookup of by.reduction as the contained reduction is not aware of its wrapper. | |
| arg_lk.update(dict((k.reduction, v) for (v, k) in enumerate(bases) if isinstance(k, by))) | |
| # where._combine() deals with combine of preceding reduction so exclude | |
| # it from explicit combine calls. | |
| base_is_where = [b.is_where() for b in bases] | |
| next_base_is_where = base_is_where[1:] + [False] | |
| calls = [(None if n else b._build_combine(d, antialias, cuda, partitioned), | |
| [arg_lk[i] for i in (b,) + t + ct]) | |
| for (b, d, t, ct, n) in zip(bases, dshapes, temps, combine_temps, next_base_is_where)] | |
| def combine(base_tuples): | |
| bases = tuple(np.stack(bs) for bs in zip(*base_tuples)) | |
| ret = [] | |
| for is_where, (func, inds) in zip(base_is_where, calls): | |
| if func is None: | |
| continue | |
| call = func(*get(inds, bases)) | |
| if is_where: | |
| # Separate aggs of where reduction and its selector, | |
| # selector's goes first to match order of bases. | |
| ret.extend(call[::-1]) | |
| else: | |
| ret.append(call) | |
| return tuple(ret) | |
| return combine | |
| def make_finalize(bases, agg, schema, cuda, partitioned): | |
| arg_lk = dict((k, v) for (v, k) in enumerate(bases)) | |
| if isinstance(agg, summary): | |
| calls = [] | |
| for key, val in zip(agg.keys, agg.values): | |
| f = make_finalize(bases, val, schema, cuda, partitioned) | |
| try: | |
| # Override bases if possible | |
| bases = val._build_bases(cuda, partitioned) | |
| except AttributeError: | |
| pass | |
| inds = [arg_lk[b] for b in bases] | |
| calls.append((key, f, inds)) | |
| def finalize(bases, cuda=False, **kwargs): | |
| data = {key: finalizer(get(inds, bases), cuda, **kwargs) | |
| for (key, finalizer, inds) in calls} | |
| # Copy x and y range attrs from any DataArray (their ranges are all the same) | |
| # to set on parent Dataset | |
| name = agg.keys[0] # Name of first DataArray. | |
| attrs = {attr: data[name].attrs[attr] for attr in ('x_range', 'y_range')} | |
| return xr.Dataset(data, attrs=attrs) | |
| return finalize | |
| else: | |
| return agg._build_finalize(schema) | |
| def make_antialias_stage_2(reds, bases): | |
| # Only called if antialias is True. | |
| # Prefer a single-stage antialiased aggregation, but if any requested | |
| # reduction requires two stages then force use of two for all reductions. | |
| self_intersect = True | |
| for red in reds: | |
| if red._antialias_requires_2_stages(): | |
| self_intersect = False | |
| break | |
| def antialias_stage_2(array_module) -> UnzippedAntialiasStage2: | |
| return tuple(zip(*concat(b._antialias_stage_2(self_intersect, array_module) | |
| for b in bases))) | |
| return self_intersect, antialias_stage_2 | |