m7n's picture
first commit
d1ed09d
raw
history blame
53.3 kB
from __future__ import annotations
import functools
import math
import operator
from collections import defaultdict
from collections.abc import Callable
from itertools import product
from typing import Any
import tlz as toolz
from tlz.curried import map
from dask.base import tokenize
from dask.blockwise import Blockwise, BlockwiseDep, BlockwiseDepDict, blockwise_token
from dask.core import flatten, keys_in_tasks
from dask.highlevelgraph import Layer
from dask.utils import (
apply,
cached_cumsum,
concrete,
insert,
stringify,
stringify_collection_keys,
)
#
##
### General Utilities
##
#
class CallableLazyImport:
"""Function Wrapper for Lazy Importing.
This Class should only be used when materializing a graph
on a distributed scheduler.
"""
def __init__(self, function_path):
self.function_path = function_path
def __call__(self, *args, **kwargs):
from distributed.utils import import_term
return import_term(self.function_path)(*args, **kwargs)
#
##
### Array Layers & Utilities
##
#
class ArrayBlockwiseDep(BlockwiseDep):
"""
Blockwise dep for array-likes, which only needs chunking
information to compute its data.
"""
chunks: tuple[tuple[int, ...], ...]
numblocks: tuple[int, ...]
produces_tasks: bool = False
def __init__(self, chunks: tuple[tuple[int, ...], ...]):
self.chunks = chunks
self.numblocks = tuple(len(chunk) for chunk in chunks)
self.produces_tasks = False
def __getitem__(self, idx: tuple[int, ...]):
raise NotImplementedError("Subclasses must implement __getitem__")
def __dask_distributed_pack__(
self, required_indices: list[tuple[int, ...]] | None = None
):
return {"chunks": self.chunks}
@classmethod
def __dask_distributed_unpack__(cls, state):
return cls(**state)
class ArrayChunkShapeDep(ArrayBlockwiseDep):
"""Produce chunk shapes given a chunk index"""
def __getitem__(self, idx: tuple[int, ...]):
return tuple(chunk[i] for i, chunk in zip(idx, self.chunks))
class ArraySliceDep(ArrayBlockwiseDep):
"""Produce slice(s) into the full-sized array given a chunk index"""
starts: tuple[tuple[int, ...], ...]
def __init__(self, chunks: tuple[tuple[int, ...], ...]):
super().__init__(chunks)
self.starts = tuple(cached_cumsum(c, initial_zero=True) for c in chunks)
def __getitem__(self, idx: tuple):
loc = tuple((start[i], start[i + 1]) for i, start in zip(idx, self.starts))
return tuple(slice(*s, None) for s in loc)
class ArrayOverlapLayer(Layer):
"""Simple HighLevelGraph array overlap layer.
Lazily computed High-level graph layer for a array overlap operations.
Parameters
----------
name : str
Name of new output overlap array.
array : Dask array
axes: Mapping
Axes dictionary indicating overlap in each dimension,
e.g. ``{'0': 1, '1': 1}``
"""
def __init__(
self,
name,
axes,
chunks,
numblocks,
token,
):
super().__init__()
self.name = name
self.axes = axes
self.chunks = chunks
self.numblocks = numblocks
self.token = token
self._cached_keys = None
def __repr__(self):
return f"ArrayOverlapLayer<name='{self.name}'"
@property
def _dict(self):
"""Materialize full dict representation"""
if hasattr(self, "_cached_dict"):
return self._cached_dict
else:
dsk = self._construct_graph()
self._cached_dict = dsk
return self._cached_dict
def __getitem__(self, key):
return self._dict[key]
def __iter__(self):
return iter(self._dict)
def __len__(self):
return len(self._dict)
def is_materialized(self):
return hasattr(self, "_cached_dict")
def get_output_keys(self):
return self.keys() # FIXME! this implementation materializes the graph
def _dask_keys(self):
if self._cached_keys is not None:
return self._cached_keys
name, chunks, numblocks = self.name, self.chunks, self.numblocks
def keys(*args):
if not chunks:
return [(name,)]
ind = len(args)
if ind + 1 == len(numblocks):
result = [(name,) + args + (i,) for i in range(numblocks[ind])]
else:
result = [keys(*(args + (i,))) for i in range(numblocks[ind])]
return result
self._cached_keys = result = keys()
return result
def _construct_graph(self, deserializing=False):
"""Construct graph for a simple overlap operation."""
axes = self.axes
chunks = self.chunks
name = self.name
dask_keys = self._dask_keys()
getitem_name = "getitem-" + self.token
overlap_name = "overlap-" + self.token
if deserializing:
# Use CallableLazyImport objects to avoid importing dataframe
# module on the scheduler
concatenate3 = CallableLazyImport("dask.array.core.concatenate3")
else:
# Not running on distributed scheduler - Use explicit functions
from dask.array.core import concatenate3
dims = list(map(len, chunks))
expand_key2 = functools.partial(
_expand_keys_around_center, dims=dims, axes=axes
)
# Make keys for each of the surrounding sub-arrays
interior_keys = toolz.pipe(
dask_keys, flatten, map(expand_key2), map(flatten), toolz.concat, list
)
interior_slices = {}
overlap_blocks = {}
for k in interior_keys:
frac_slice = fractional_slice((name,) + k, axes)
if (name,) + k != frac_slice:
interior_slices[(getitem_name,) + k] = frac_slice
else:
interior_slices[(getitem_name,) + k] = (name,) + k
overlap_blocks[(overlap_name,) + k] = (
concatenate3,
(concrete, expand_key2((None,) + k, name=getitem_name)),
)
dsk = toolz.merge(interior_slices, overlap_blocks)
return dsk
@classmethod
def __dask_distributed_unpack__(cls, state):
return cls(**state)._construct_graph(deserializing=True)
def _expand_keys_around_center(k, dims, name=None, axes=None):
"""Get all neighboring keys around center
Parameters
----------
k: tuple
They key around which to generate new keys
dims: Sequence[int]
The number of chunks in each dimension
name: Option[str]
The name to include in the output keys, or none to include no name
axes: Dict[int, int]
The axes active in the expansion. We don't expand on non-active axes
Examples
--------
>>> _expand_keys_around_center(('x', 2, 3), dims=[5, 5], name='y', axes={0: 1, 1: 1}) # noqa: E501 # doctest: +NORMALIZE_WHITESPACE
[[('y', 1.1, 2.1), ('y', 1.1, 3), ('y', 1.1, 3.9)],
[('y', 2, 2.1), ('y', 2, 3), ('y', 2, 3.9)],
[('y', 2.9, 2.1), ('y', 2.9, 3), ('y', 2.9, 3.9)]]
>>> _expand_keys_around_center(('x', 0, 4), dims=[5, 5], name='y', axes={0: 1, 1: 1}) # noqa: E501 # doctest: +NORMALIZE_WHITESPACE
[[('y', 0, 3.1), ('y', 0, 4)],
[('y', 0.9, 3.1), ('y', 0.9, 4)]]
"""
def inds(i, ind):
rv = []
if ind - 0.9 > 0:
rv.append(ind - 0.9)
rv.append(ind)
if ind + 0.9 < dims[i] - 1:
rv.append(ind + 0.9)
return rv
shape = []
for i, ind in enumerate(k[1:]):
num = 1
if ind > 0:
num += 1
if ind < dims[i] - 1:
num += 1
shape.append(num)
args = [
inds(i, ind) if any((axes.get(i, 0),)) else [ind] for i, ind in enumerate(k[1:])
]
if name is not None:
args = [[name]] + args
seq = list(product(*args))
shape2 = [d if any((axes.get(i, 0),)) else 1 for i, d in enumerate(shape)]
result = reshapelist(shape2, seq)
return result
def reshapelist(shape, seq):
"""Reshape iterator to nested shape
>>> reshapelist((2, 3), range(6))
[[0, 1, 2], [3, 4, 5]]
"""
if len(shape) == 1:
return list(seq)
else:
n = int(len(seq) / shape[0])
return [reshapelist(shape[1:], part) for part in toolz.partition(n, seq)]
def fractional_slice(task, axes):
"""
>>> fractional_slice(('x', 5.1), {0: 2})
(<built-in function getitem>, ('x', 5), (slice(-2, None, None),))
>>> fractional_slice(('x', 3, 5.1), {0: 2, 1: 3})
(<built-in function getitem>, ('x', 3, 5), (slice(None, None, None), slice(-3, None, None)))
>>> fractional_slice(('x', 2.9, 5.1), {0: 2, 1: 3})
(<built-in function getitem>, ('x', 3, 5), (slice(0, 2, None), slice(-3, None, None)))
"""
rounded = (task[0],) + tuple(int(round(i)) for i in task[1:])
index = []
for i, (t, r) in enumerate(zip(task[1:], rounded[1:])):
depth = axes.get(i, 0)
if isinstance(depth, tuple):
left_depth = depth[0]
right_depth = depth[1]
else:
left_depth = depth
right_depth = depth
if t == r:
index.append(slice(None, None, None))
elif t < r and right_depth:
index.append(slice(0, right_depth))
elif t > r and left_depth:
index.append(slice(-left_depth, None))
else:
index.append(slice(0, 0))
index = tuple(index)
if all(ind == slice(None, None, None) for ind in index):
return task
else:
return (operator.getitem, rounded, index)
#
##
### DataFrame Layers & Utilities
##
#
class SimpleShuffleLayer(Layer):
"""Simple HighLevelGraph Shuffle layer
High-level graph layer for a simple shuffle operation in which
each output partition depends on all input partitions.
Parameters
----------
name : str
Name of new shuffled output collection.
column : str or list of str
Column(s) to be used to map rows to output partitions (by hashing).
npartitions : int
Number of output partitions.
npartitions_input : int
Number of partitions in the original (un-shuffled) DataFrame.
ignore_index: bool, default False
Ignore index during shuffle. If ``True``, performance may improve,
but index values will not be preserved.
name_input : str
Name of input collection.
meta_input : pd.DataFrame-like object
Empty metadata of input collection.
parts_out : list of int (optional)
List of required output-partition indices.
annotations : dict (optional)
Layer annotations
"""
def __init__(
self,
name,
column,
npartitions,
npartitions_input,
ignore_index,
name_input,
meta_input,
parts_out=None,
annotations=None,
):
super().__init__(annotations=annotations)
self.name = name
self.column = column
self.npartitions = npartitions
self.npartitions_input = npartitions_input
self.ignore_index = ignore_index
self.name_input = name_input
self.meta_input = meta_input
self.parts_out = parts_out or range(npartitions)
self.split_name = "split-" + self.name
# The scheduling policy of Dask is generally depth-first,
# which works great in most cases. However, in case of shuffle,
# it increases the memory usage significantly. This is because
# depth-first delays the freeing of the result of `shuffle_group()`
# until the end of the shuffling.
#
# We address this by manually setting a high "priority" to the
# `getitem()` ("split") tasks, using annotations. This forces a
# breadth-first scheduling of the tasks that directly depend on
# the `shuffle_group()` output, allowing that data to be freed
# much earlier.
#
# See https://github.com/dask/dask/pull/6051 for a detailed discussion.
self.annotations = self.annotations or {}
if "priority" not in self.annotations:
self.annotations["priority"] = {}
self.annotations["priority"]["__expanded_annotations__"] = None
self.annotations["priority"].update({_key: 1 for _key in self.get_split_keys()})
def get_split_keys(self):
# Return SimpleShuffleLayer "split" keys
return [
stringify((self.split_name, part_out, part_in))
for part_in in range(self.npartitions_input)
for part_out in self.parts_out
]
def get_output_keys(self):
return {(self.name, part) for part in self.parts_out}
def __repr__(self):
return "SimpleShuffleLayer<name='{}', npartitions={}>".format(
self.name, self.npartitions
)
def is_materialized(self):
return hasattr(self, "_cached_dict")
@property
def _dict(self):
"""Materialize full dict representation"""
if hasattr(self, "_cached_dict"):
return self._cached_dict
else:
dsk = self._construct_graph()
self._cached_dict = dsk
return self._cached_dict
def __getitem__(self, key):
return self._dict[key]
def __iter__(self):
return iter(self._dict)
def __len__(self):
return len(self._dict)
def _keys_to_parts(self, keys):
"""Simple utility to convert keys to partition indices."""
parts = set()
for key in keys:
try:
_name, _part = key
except ValueError:
continue
if _name != self.name:
continue
parts.add(_part)
return parts
def _cull_dependencies(self, keys, parts_out=None):
"""Determine the necessary dependencies to produce `keys`.
For a simple shuffle, output partitions always depend on
all input partitions. This method does not require graph
materialization.
"""
deps = defaultdict(set)
parts_out = parts_out or self._keys_to_parts(keys)
for part in parts_out:
deps[(self.name, part)] |= {
(self.name_input, i) for i in range(self.npartitions_input)
}
return deps
def _cull(self, parts_out):
return SimpleShuffleLayer(
self.name,
self.column,
self.npartitions,
self.npartitions_input,
self.ignore_index,
self.name_input,
self.meta_input,
parts_out=parts_out,
)
def cull(self, keys, all_keys):
"""Cull a SimpleShuffleLayer HighLevelGraph layer.
The underlying graph will only include the necessary
tasks to produce the keys (indices) included in `parts_out`.
Therefore, "culling" the layer only requires us to reset this
parameter.
"""
parts_out = self._keys_to_parts(keys)
culled_deps = self._cull_dependencies(keys, parts_out=parts_out)
if parts_out != set(self.parts_out):
culled_layer = self._cull(parts_out)
return culled_layer, culled_deps
else:
return self, culled_deps
def __reduce__(self):
attrs = [
"name",
"column",
"npartitions",
"npartitions_input",
"ignore_index",
"name_input",
"meta_input",
"parts_out",
"annotations",
]
return (SimpleShuffleLayer, tuple(getattr(self, attr) for attr in attrs))
def __dask_distributed_pack__(
self, all_hlg_keys, known_key_dependencies, client, client_keys
):
from distributed.protocol.serialize import to_serialize
return {
"name": self.name,
"column": self.column,
"npartitions": self.npartitions,
"npartitions_input": self.npartitions_input,
"ignore_index": self.ignore_index,
"name_input": self.name_input,
"meta_input": to_serialize(self.meta_input),
"parts_out": list(self.parts_out),
}
@classmethod
def __dask_distributed_unpack__(cls, state, dsk, dependencies):
from distributed.worker import dumps_task
# msgpack will convert lists into tuples, here
# we convert them back to lists
if isinstance(state["column"], tuple):
state["column"] = list(state["column"])
if "inputs" in state:
state["inputs"] = list(state["inputs"])
# Materialize the layer
layer_dsk = cls(**state)._construct_graph(deserializing=True)
# Convert all keys to strings and dump tasks
layer_dsk = {
stringify(k): stringify_collection_keys(v) for k, v in layer_dsk.items()
}
keys = layer_dsk.keys() | dsk.keys()
# TODO: use shuffle-knowledge to calculate dependencies more efficiently
deps = {k: keys_in_tasks(keys, [v]) for k, v in layer_dsk.items()}
return {"dsk": toolz.valmap(dumps_task, layer_dsk), "deps": deps}
def _construct_graph(self, deserializing=False):
"""Construct graph for a simple shuffle operation."""
shuffle_group_name = "group-" + self.name
if deserializing:
# Use CallableLazyImport objects to avoid importing dataframe
# module on the scheduler
concat_func = CallableLazyImport("dask.dataframe.core._concat")
shuffle_group_func = CallableLazyImport(
"dask.dataframe.shuffle.shuffle_group"
)
else:
# Not running on distributed scheduler - Use explicit functions
from dask.dataframe.core import _concat as concat_func
from dask.dataframe.shuffle import shuffle_group as shuffle_group_func
dsk = {}
for part_out in self.parts_out:
_concat_list = [
(self.split_name, part_out, part_in)
for part_in in range(self.npartitions_input)
]
dsk[(self.name, part_out)] = (
concat_func,
_concat_list,
self.ignore_index,
)
for _, _part_out, _part_in in _concat_list:
dsk[(self.split_name, _part_out, _part_in)] = (
operator.getitem,
(shuffle_group_name, _part_in),
_part_out,
)
if (shuffle_group_name, _part_in) not in dsk:
dsk[(shuffle_group_name, _part_in)] = (
shuffle_group_func,
(self.name_input, _part_in),
self.column,
0,
self.npartitions,
self.npartitions,
self.ignore_index,
self.npartitions,
)
return dsk
class ShuffleLayer(SimpleShuffleLayer):
"""Shuffle-stage HighLevelGraph layer
High-level graph layer corresponding to a single stage of
a multi-stage inter-partition shuffle operation.
Stage: (shuffle-group) -> (shuffle-split) -> (shuffle-join)
Parameters
----------
name : str
Name of new (partially) shuffled collection.
column : str or list of str
Column(s) to be used to map rows to output partitions (by hashing).
inputs : list of tuples
Each tuple dictates the data movement for a specific partition.
stage : int
Index of the current shuffle stage.
npartitions : int
Number of output partitions for the full (multi-stage) shuffle.
npartitions_input : int
Number of partitions in the original (un-shuffled) DataFrame.
k : int
A partition is split into this many groups during each stage.
ignore_index: bool, default False
Ignore index during shuffle. If ``True``, performance may improve,
but index values will not be preserved.
name_input : str
Name of input collection.
meta_input : pd.DataFrame-like object
Empty metadata of input collection.
parts_out : list of int (optional)
List of required output-partition indices.
annotations : dict (optional)
Layer annotations
"""
def __init__(
self,
name,
column,
inputs,
stage,
npartitions,
npartitions_input,
nsplits,
ignore_index,
name_input,
meta_input,
parts_out=None,
annotations=None,
):
self.inputs = inputs
self.stage = stage
self.nsplits = nsplits
super().__init__(
name,
column,
npartitions,
npartitions_input,
ignore_index,
name_input,
meta_input,
parts_out=parts_out or range(len(inputs)),
annotations=annotations,
)
def get_split_keys(self):
# Return ShuffleLayer "split" keys
keys = []
for part in self.parts_out:
out = self.inputs[part]
for i in range(self.nsplits):
keys.append(
stringify(
(
self.split_name,
out[self.stage],
insert(out, self.stage, i),
)
)
)
return keys
def __repr__(self):
return "ShuffleLayer<name='{}', stage={}, nsplits={}, npartitions={}>".format(
self.name, self.stage, self.nsplits, self.npartitions
)
def __reduce__(self):
attrs = [
"name",
"column",
"inputs",
"stage",
"npartitions",
"npartitions_input",
"nsplits",
"ignore_index",
"name_input",
"meta_input",
"parts_out",
"annotations",
]
return (ShuffleLayer, tuple(getattr(self, attr) for attr in attrs))
def __dask_distributed_pack__(self, *args, **kwargs):
ret = super().__dask_distributed_pack__(*args, **kwargs)
ret["inputs"] = self.inputs
ret["stage"] = self.stage
ret["nsplits"] = self.nsplits
return ret
def _cull_dependencies(self, keys, parts_out=None):
"""Determine the necessary dependencies to produce `keys`.
Does not require graph materialization.
"""
deps = defaultdict(set)
parts_out = parts_out or self._keys_to_parts(keys)
inp_part_map = {inp: i for i, inp in enumerate(self.inputs)}
for part in parts_out:
out = self.inputs[part]
for k in range(self.nsplits):
_inp = insert(out, self.stage, k)
_part = inp_part_map[_inp]
if self.stage == 0 and _part >= self.npartitions_input:
deps[(self.name, part)].add(("group-" + self.name, _inp, "empty"))
else:
deps[(self.name, part)].add((self.name_input, _part))
return deps
def _cull(self, parts_out):
return ShuffleLayer(
self.name,
self.column,
self.inputs,
self.stage,
self.npartitions,
self.npartitions_input,
self.nsplits,
self.ignore_index,
self.name_input,
self.meta_input,
parts_out=parts_out,
)
def _construct_graph(self, deserializing=False):
"""Construct graph for a "rearrange-by-column" stage."""
shuffle_group_name = "group-" + self.name
if deserializing:
# Use CallableLazyImport objects to avoid importing dataframe
# module on the scheduler
concat_func = CallableLazyImport("dask.dataframe.core._concat")
shuffle_group_func = CallableLazyImport(
"dask.dataframe.shuffle.shuffle_group"
)
else:
# Not running on distributed scheduler - Use explicit functions
from dask.dataframe.core import _concat as concat_func
from dask.dataframe.shuffle import shuffle_group as shuffle_group_func
dsk = {}
inp_part_map = {inp: i for i, inp in enumerate(self.inputs)}
for part in self.parts_out:
out = self.inputs[part]
_concat_list = [] # get_item tasks to concat for this output partition
for i in range(self.nsplits):
# Get out each individual dataframe piece from the dicts
_inp = insert(out, self.stage, i)
_idx = out[self.stage]
_concat_list.append((self.split_name, _idx, _inp))
# concatenate those pieces together, with their friends
dsk[(self.name, part)] = (
concat_func,
_concat_list,
self.ignore_index,
)
for _, _idx, _inp in _concat_list:
dsk[(self.split_name, _idx, _inp)] = (
operator.getitem,
(shuffle_group_name, _inp),
_idx,
)
if (shuffle_group_name, _inp) not in dsk:
# Initial partitions (output of previous stage)
_part = inp_part_map[_inp]
if self.stage == 0:
if _part < self.npartitions_input:
input_key = (self.name_input, _part)
else:
# In order to make sure that to_serialize() serialize the
# empty dataframe input, we add it as a key.
input_key = (shuffle_group_name, _inp, "empty")
dsk[input_key] = self.meta_input
else:
input_key = (self.name_input, _part)
# Convert partition into dict of dataframe pieces
dsk[(shuffle_group_name, _inp)] = (
shuffle_group_func,
input_key,
self.column,
self.stage,
self.nsplits,
self.npartitions_input,
self.ignore_index,
self.npartitions,
)
return dsk
class BroadcastJoinLayer(Layer):
"""Broadcast-based Join Layer
High-level graph layer for a join operation requiring the
smaller collection to be broadcasted to every partition of
the larger collection.
Parameters
----------
name : str
Name of new (joined) output collection.
lhs_name: string
"Left" DataFrame collection to join.
lhs_npartitions: int
Number of partitions in "left" DataFrame collection.
rhs_name: string
"Right" DataFrame collection to join.
rhs_npartitions: int
Number of partitions in "right" DataFrame collection.
parts_out : list of int (optional)
List of required output-partition indices.
annotations : dict (optional)
Layer annotations.
**merge_kwargs : **dict
Keyword arguments to be passed to chunkwise merge func.
"""
def __init__(
self,
name,
npartitions,
lhs_name,
lhs_npartitions,
rhs_name,
rhs_npartitions,
parts_out=None,
annotations=None,
left_on=None,
right_on=None,
**merge_kwargs,
):
super().__init__(annotations=annotations)
self.name = name
self.npartitions = npartitions
self.lhs_name = lhs_name
self.lhs_npartitions = lhs_npartitions
self.rhs_name = rhs_name
self.rhs_npartitions = rhs_npartitions
self.parts_out = parts_out or set(range(self.npartitions))
self.left_on = tuple(left_on) if isinstance(left_on, list) else left_on
self.right_on = tuple(right_on) if isinstance(right_on, list) else right_on
self.merge_kwargs = merge_kwargs
self.how = self.merge_kwargs.get("how")
self.merge_kwargs["left_on"] = self.left_on
self.merge_kwargs["right_on"] = self.right_on
def get_output_keys(self):
return {(self.name, part) for part in self.parts_out}
def __repr__(self):
return "BroadcastJoinLayer<name='{}', how={}, lhs={}, rhs={}>".format(
self.name, self.how, self.lhs_name, self.rhs_name
)
def is_materialized(self):
return hasattr(self, "_cached_dict")
@property
def _dict(self):
"""Materialize full dict representation"""
if hasattr(self, "_cached_dict"):
return self._cached_dict
else:
dsk = self._construct_graph()
self._cached_dict = dsk
return self._cached_dict
def __getitem__(self, key):
return self._dict[key]
def __iter__(self):
return iter(self._dict)
def __len__(self):
return len(self._dict)
def __dask_distributed_pack__(self, *args, **kwargs):
import pickle
# Pickle complex merge_kwargs elements. Also
# tuples, which may be confused with keys.
_merge_kwargs = {}
for k, v in self.merge_kwargs.items():
if not isinstance(v, (str, list, bool)):
_merge_kwargs[k] = pickle.dumps(v)
else:
_merge_kwargs[k] = v
return {
"name": self.name,
"npartitions": self.npartitions,
"lhs_name": self.lhs_name,
"lhs_npartitions": self.lhs_npartitions,
"rhs_name": self.rhs_name,
"rhs_npartitions": self.rhs_npartitions,
"parts_out": self.parts_out,
"merge_kwargs": _merge_kwargs,
}
@classmethod
def __dask_distributed_unpack__(cls, state, dsk, dependencies):
from distributed.worker import dumps_task
# Expand merge_kwargs
merge_kwargs = state.pop("merge_kwargs", {})
state.update(merge_kwargs)
# Materialize the layer
raw = cls(**state)._construct_graph(deserializing=True)
# Convert all keys to strings and dump tasks
raw = {stringify(k): stringify_collection_keys(v) for k, v in raw.items()}
keys = raw.keys() | dsk.keys()
deps = {k: keys_in_tasks(keys, [v]) for k, v in raw.items()}
return {"dsk": toolz.valmap(dumps_task, raw), "deps": deps}
def _keys_to_parts(self, keys):
"""Simple utility to convert keys to partition indices."""
parts = set()
for key in keys:
try:
_name, _part = key
except ValueError:
continue
if _name != self.name:
continue
parts.add(_part)
return parts
@property
def _broadcast_plan(self):
# Return structure (tuple):
# (
# <broadcasted-collection-name>,
# <broadcasted-collection-npartitions>,
# <other-collection-npartitions>,
# <other-collection-on>,
# )
if self.lhs_npartitions < self.rhs_npartitions:
# Broadcasting the left
return (
self.lhs_name,
self.lhs_npartitions,
self.rhs_name,
self.right_on,
)
else:
# Broadcasting the right
return (
self.rhs_name,
self.rhs_npartitions,
self.lhs_name,
self.left_on,
)
def _cull_dependencies(self, keys, parts_out=None):
"""Determine the necessary dependencies to produce `keys`.
For a broadcast join, output partitions always depend on
all partitions of the broadcasted collection, but only one
partition of the "other" collection.
"""
# Get broadcast info
bcast_name, bcast_size, other_name = self._broadcast_plan[:3]
deps = defaultdict(set)
parts_out = parts_out or self._keys_to_parts(keys)
for part in parts_out:
deps[(self.name, part)] |= {(bcast_name, i) for i in range(bcast_size)}
deps[(self.name, part)] |= {
(other_name, part),
}
return deps
def _cull(self, parts_out):
return BroadcastJoinLayer(
self.name,
self.npartitions,
self.lhs_name,
self.lhs_npartitions,
self.rhs_name,
self.rhs_npartitions,
annotations=self.annotations,
parts_out=parts_out,
**self.merge_kwargs,
)
def cull(self, keys, all_keys):
"""Cull a BroadcastJoinLayer HighLevelGraph layer.
The underlying graph will only include the necessary
tasks to produce the keys (indices) included in `parts_out`.
Therefore, "culling" the layer only requires us to reset this
parameter.
"""
parts_out = self._keys_to_parts(keys)
culled_deps = self._cull_dependencies(keys, parts_out=parts_out)
if parts_out != set(self.parts_out):
culled_layer = self._cull(parts_out)
return culled_layer, culled_deps
else:
return self, culled_deps
def _construct_graph(self, deserializing=False):
"""Construct graph for a broadcast join operation."""
inter_name = "inter-" + self.name
split_name = "split-" + self.name
if deserializing:
# Use CallableLazyImport objects to avoid importing dataframe
# module on the scheduler
split_partition_func = CallableLazyImport(
"dask.dataframe.multi._split_partition"
)
concat_func = CallableLazyImport("dask.dataframe.multi._concat_wrapper")
merge_chunk_func = CallableLazyImport(
"dask.dataframe.multi._merge_chunk_wrapper"
)
else:
# Not running on distributed scheduler - Use explicit functions
from dask.dataframe.multi import _concat_wrapper as concat_func
from dask.dataframe.multi import _merge_chunk_wrapper as merge_chunk_func
from dask.dataframe.multi import _split_partition as split_partition_func
# Get broadcast "plan"
bcast_name, bcast_size, other_name, other_on = self._broadcast_plan
bcast_side = "left" if self.lhs_npartitions < self.rhs_npartitions else "right"
# Loop over output partitions, which should be a 1:1
# mapping with the input partitions of "other".
# Culling should allow us to avoid generating tasks for
# any output partitions that are not requested (via `parts_out`)
dsk = {}
for i in self.parts_out:
# Split each "other" partition by hash
if self.how != "inner":
dsk[(split_name, i)] = (
split_partition_func,
(other_name, i),
other_on,
bcast_size,
)
# For each partition of "other", we need to join
# to each partition of "bcast". If it is a "left"
# or "right" join, there should be a unique mapping
# between the local splits of "other" and the
# partitions of "bcast" (which means we need an
# additional `getitem` operation to isolate the
# correct split of each "other" partition).
_concat_list = []
for j in range(bcast_size):
# Specify arg list for `merge_chunk`
_merge_args = [
(
operator.getitem,
(split_name, i),
j,
)
if self.how != "inner"
else (other_name, i),
(bcast_name, j),
]
if bcast_side == "left":
# If the left is broadcasted, the
# arg list needs to be reversed
_merge_args.reverse()
inter_key = (inter_name, i, j)
dsk[inter_key] = (
apply,
merge_chunk_func,
_merge_args,
self.merge_kwargs,
)
_concat_list.append(inter_key)
# Concatenate the merged results for each output partition
dsk[(self.name, i)] = (concat_func, _concat_list)
return dsk
class DataFrameIOLayer(Blockwise):
"""DataFrame-based Blockwise Layer with IO
Parameters
----------
name : str
Name to use for the constructed layer.
columns : str, list or None
Field name(s) to read in as columns in the output.
inputs : list or BlockwiseDep
List of arguments to be passed to ``io_func`` so
that the materialized task to produce partition ``i``
will be: ``(<io_func>, inputs[i])``. Note that each
element of ``inputs`` is typically a tuple of arguments.
io_func : callable
A callable function that takes in a single tuple
of arguments, and outputs a DataFrame partition.
Column projection will be supported for functions
that satisfy the ``DataFrameIOFunction`` protocol.
label : str (optional)
String to use as a prefix in the place-holder collection
name. If nothing is specified (default), "subset-" will
be used.
produces_tasks : bool (optional)
Whether one or more elements of `inputs` is expected to
contain a nested task. This argument in only used for
serialization purposes, and will be deprecated in the
future. Default is False.
creation_info: dict (optional)
Dictionary containing the callable function ('func'),
positional arguments ('args'), and key-word arguments
('kwargs') used to produce the dask collection with
this underlying ``DataFrameIOLayer``.
annotations: dict (optional)
Layer annotations to pass through to Blockwise.
"""
def __init__(
self,
name,
columns,
inputs,
io_func,
label=None,
produces_tasks=False,
creation_info=None,
annotations=None,
):
self.name = name
self._columns = columns
self.inputs = inputs
self.io_func = io_func
self.label = label
self.produces_tasks = produces_tasks
self.annotations = annotations
self.creation_info = creation_info
if not isinstance(inputs, BlockwiseDep):
# Define mapping between key index and "part"
io_arg_map = BlockwiseDepDict(
{(i,): inp for i, inp in enumerate(self.inputs)},
produces_tasks=self.produces_tasks,
)
else:
io_arg_map = inputs
# Use Blockwise initializer
dsk = {self.name: (io_func, blockwise_token(0))}
super().__init__(
output=self.name,
output_indices="i",
dsk=dsk,
indices=[(io_arg_map, "i")],
numblocks={},
annotations=annotations,
)
@property
def columns(self):
"""Current column projection for this layer"""
return self._columns
def project_columns(self, columns):
"""Produce a column projection for this IO layer.
Given a list of required output columns, this method
returns the projected layer.
"""
from dask.dataframe.io.utils import DataFrameIOFunction
columns = list(columns)
if self.columns is None or set(self.columns).issuperset(columns):
# Apply column projection in IO function.
# Must satisfy `DataFrameIOFunction` protocol
if isinstance(self.io_func, DataFrameIOFunction):
io_func = self.io_func.project_columns(columns)
else:
io_func = self.io_func
layer = DataFrameIOLayer(
(self.label or "subset") + "-" + tokenize(self.name, columns),
columns,
self.inputs,
io_func,
label=self.label,
produces_tasks=self.produces_tasks,
annotations=self.annotations,
)
return layer
else:
# Default behavior
return self
def __repr__(self):
return "DataFrameIOLayer<name='{}', n_parts={}, columns={}>".format(
self.name, len(self.inputs), self.columns
)
class DataFrameTreeReduction(Layer):
"""DataFrame Tree-Reduction Layer
Parameters
----------
name : str
Name to use for the constructed layer.
name_input : str
Name of the input layer that is being reduced.
npartitions_input : str
Number of partitions in the input layer.
concat_func : callable
Function used by each tree node to reduce a list of inputs
into a single output value. This function must accept only
a list as its first positional argument.
tree_node_func : callable
Function used on the output of ``concat_func`` in each tree
node. This function must accept the output of ``concat_func``
as its first positional argument.
finalize_func : callable, optional
Function used in place of ``tree_node_func`` on the final tree
node(s) to produce the final output for each split. By default,
``tree_node_func`` will be used.
split_every : int, optional
This argument specifies the maximum number of input nodes
to be handled by any one task in the tree. Defaults to 32.
split_out : int, optional
This argument specifies the number of output nodes in the
reduction tree. If ``split_out`` is set to an integer >=1, the
input tasks must contain data that can be indexed by a ``getitem``
operation with a key in the range ``[0, split_out)``.
output_partitions : list, optional
List of required output partitions. This parameter is used
internally by Dask for high-level culling.
tree_node_name : str, optional
Name to use for intermediate tree-node tasks.
"""
name: str
name_input: str
npartitions_input: int
concat_func: Callable
tree_node_func: Callable
finalize_func: Callable | None
split_every: int
split_out: int
output_partitions: list[int]
tree_node_name: str
widths: list[int]
height: int
def __init__(
self,
name: str,
name_input: str,
npartitions_input: int,
concat_func: Callable,
tree_node_func: Callable,
finalize_func: Callable | None = None,
split_every: int = 32,
split_out: int | None = None,
output_partitions: list[int] | None = None,
tree_node_name: str | None = None,
annotations: dict[str, Any] | None = None,
):
super().__init__(annotations=annotations)
self.name = name
self.name_input = name_input
self.npartitions_input = npartitions_input
self.concat_func = concat_func
self.tree_node_func = tree_node_func
self.finalize_func = finalize_func
self.split_every = split_every
self.split_out = split_out # type: ignore
self.output_partitions = (
list(range(self.split_out or 1))
if output_partitions is None
else output_partitions
)
self.tree_node_name = tree_node_name or "tree_node-" + self.name
# Calculate tree widths and height
# (Used to get output keys without materializing)
parts = self.npartitions_input
self.widths = [parts]
while parts > 1:
parts = math.ceil(parts / self.split_every)
self.widths.append(int(parts))
self.height = len(self.widths)
def _make_key(self, *name_parts, split=0):
# Helper function construct a key
# with a "split" element when
# bool(split_out) is True
return name_parts + (split,) if self.split_out else name_parts
def _define_task(self, input_keys, final_task=False):
# Define nested concatenation and func task
if final_task and self.finalize_func:
outer_func = self.finalize_func
else:
outer_func = self.tree_node_func
return (toolz.pipe, input_keys, self.concat_func, outer_func)
def _construct_graph(self):
"""Construct graph for a tree reduction."""
dsk = {}
if not self.output_partitions:
return dsk
# Deal with `bool(split_out) == True`.
# These cases require that the input tasks
# return a type that enables getitem operation
# with indices: [0, split_out)
# Therefore, we must add "getitem" tasks to
# select the appropriate element for each split
name_input_use = self.name_input
if self.split_out:
name_input_use += "-split"
for s in self.output_partitions:
for p in range(self.npartitions_input):
dsk[self._make_key(name_input_use, p, split=s)] = (
operator.getitem,
(self.name_input, p),
s,
)
if self.height >= 2:
# Loop over output splits
for s in self.output_partitions:
# Loop over reduction levels
for depth in range(1, self.height):
# Loop over reduction groups
for group in range(self.widths[depth]):
# Calculate inputs for the current group
p_max = self.widths[depth - 1]
lstart = self.split_every * group
lstop = min(lstart + self.split_every, p_max)
if depth == 1:
# Input nodes are from input layer
input_keys = [
self._make_key(name_input_use, p, split=s)
for p in range(lstart, lstop)
]
else:
# Input nodes are tree-reduction nodes
input_keys = [
self._make_key(
self.tree_node_name, p, depth - 1, split=s
)
for p in range(lstart, lstop)
]
# Define task
if depth == self.height - 1:
# Final Node (Use fused `self.tree_finalize` task)
assert (
group == 0
), f"group = {group}, not 0 for final tree reduction task"
dsk[(self.name, s)] = self._define_task(
input_keys, final_task=True
)
else:
# Intermediate Node
dsk[
self._make_key(
self.tree_node_name, group, depth, split=s
)
] = self._define_task(input_keys, final_task=False)
else:
# Deal with single-partition case
for s in self.output_partitions:
input_keys = [self._make_key(name_input_use, 0, split=s)]
dsk[(self.name, s)] = self._define_task(input_keys, final_task=True)
return dsk
def __repr__(self):
return "DataFrameTreeReduction<name='{}', input_name={}, split_out={}>".format(
self.name, self.name_input, self.split_out
)
def _output_keys(self):
return {(self.name, s) for s in self.output_partitions}
def get_output_keys(self):
if hasattr(self, "_cached_output_keys"):
return self._cached_output_keys
else:
output_keys = self._output_keys()
self._cached_output_keys = output_keys
return self._cached_output_keys
def is_materialized(self):
return hasattr(self, "_cached_dict")
@property
def _dict(self):
"""Materialize full dict representation"""
if hasattr(self, "_cached_dict"):
return self._cached_dict
else:
dsk = self._construct_graph()
self._cached_dict = dsk
return self._cached_dict
def __getitem__(self, key):
return self._dict[key]
def __iter__(self):
return iter(self._dict)
def __len__(self):
# Start with "base" tree-reduction size
tree_size = (sum(self.widths[1:]) or 1) * (self.split_out or 1)
if self.split_out:
# Add on "split-*" tasks used for `getitem` ops
return tree_size + self.npartitions_input * len(self.output_partitions)
return tree_size
def _keys_to_output_partitions(self, keys):
"""Simple utility to convert keys to output partition indices."""
splits = set()
for key in keys:
try:
_name, _split = key
except ValueError:
continue
if _name != self.name:
continue
splits.add(_split)
return splits
def _cull(self, output_partitions):
return DataFrameTreeReduction(
self.name,
self.name_input,
self.npartitions_input,
self.concat_func,
self.tree_node_func,
finalize_func=self.finalize_func,
split_every=self.split_every,
split_out=self.split_out,
output_partitions=output_partitions,
tree_node_name=self.tree_node_name,
annotations=self.annotations,
)
def cull(self, keys, all_keys):
"""Cull a DataFrameTreeReduction HighLevelGraph layer"""
deps = {
(self.name, 0): {
(self.name_input, i) for i in range(self.npartitions_input)
}
}
output_partitions = self._keys_to_output_partitions(keys)
if output_partitions != set(self.output_partitions):
culled_layer = self._cull(output_partitions)
return culled_layer, deps
else:
return self, deps
def __dask_distributed_pack__(self, *args, **kwargs):
from distributed.protocol.serialize import to_serialize
# Pickle the (possibly) user-defined functions here
_concat_func = to_serialize(self.concat_func)
_tree_node_func = to_serialize(self.tree_node_func)
if self.finalize_func:
_finalize_func = to_serialize(self.finalize_func)
else:
_finalize_func = None
return {
"name": self.name,
"name_input": self.name_input,
"npartitions_input": self.npartitions_input,
"concat_func": _concat_func,
"tree_node_func": _tree_node_func,
"finalize_func": _finalize_func,
"split_every": self.split_every,
"split_out": self.split_out,
"output_partitions": self.output_partitions,
"tree_node_name": self.tree_node_name,
}
@classmethod
def __dask_distributed_unpack__(cls, state, dsk, dependencies):
from distributed.protocol.serialize import to_serialize
# Materialize the layer
raw = cls(**state)._construct_graph()
# Convert all keys to strings and dump tasks
raw = {stringify(k): stringify_collection_keys(v) for k, v in raw.items()}
keys = raw.keys() | dsk.keys()
deps = {k: keys_in_tasks(keys, [v]) for k, v in raw.items()}
# Must use `to_serialize` on the entire task.
# This is required because the task-tuples contain `Serialized`
# function objects instead of real functions. Using `dumps_task`
# may or may not correctly wrap the entire tuple in `to_serialize`.
# So we use `to_serialize` here to be explicit. When the task
# arrives at a worker, both the `Serialized` task-tuples and the
# `Serialized` functions nested within them should be deserialzed
# automatically by the comm.
return {"dsk": toolz.valmap(to_serialize, raw), "deps": deps}