|
|
|
|
|
|
|
from collections import Counter |
|
from contextlib import suppress |
|
from typing import NamedTuple |
|
|
|
import numpy as np |
|
|
|
from ._array_api import ( |
|
_isin, |
|
_searchsorted, |
|
_setdiff1d, |
|
device, |
|
get_namespace, |
|
) |
|
from ._missing import is_scalar_nan |
|
|
|
|
|
def _unique(values, *, return_inverse=False, return_counts=False): |
|
"""Helper function to find unique values with support for python objects. |
|
|
|
Uses pure python method for object dtype, and numpy method for |
|
all other dtypes. |
|
|
|
Parameters |
|
---------- |
|
values : ndarray |
|
Values to check for unknowns. |
|
|
|
return_inverse : bool, default=False |
|
If True, also return the indices of the unique values. |
|
|
|
return_counts : bool, default=False |
|
If True, also return the number of times each unique item appears in |
|
values. |
|
|
|
Returns |
|
------- |
|
unique : ndarray |
|
The sorted unique values. |
|
|
|
unique_inverse : ndarray |
|
The indices to reconstruct the original array from the unique array. |
|
Only provided if `return_inverse` is True. |
|
|
|
unique_counts : ndarray |
|
The number of times each of the unique values comes up in the original |
|
array. Only provided if `return_counts` is True. |
|
""" |
|
if values.dtype == object: |
|
return _unique_python( |
|
values, return_inverse=return_inverse, return_counts=return_counts |
|
) |
|
|
|
return _unique_np( |
|
values, return_inverse=return_inverse, return_counts=return_counts |
|
) |
|
|
|
|
|
def _unique_np(values, return_inverse=False, return_counts=False): |
|
"""Helper function to find unique values for numpy arrays that correctly |
|
accounts for nans. See `_unique` documentation for details.""" |
|
xp, _ = get_namespace(values) |
|
|
|
inverse, counts = None, None |
|
|
|
if return_inverse and return_counts: |
|
uniques, _, inverse, counts = xp.unique_all(values) |
|
elif return_inverse: |
|
uniques, inverse = xp.unique_inverse(values) |
|
elif return_counts: |
|
uniques, counts = xp.unique_counts(values) |
|
else: |
|
uniques = xp.unique_values(values) |
|
|
|
|
|
|
|
if uniques.size and is_scalar_nan(uniques[-1]): |
|
nan_idx = _searchsorted(uniques, xp.nan, xp=xp) |
|
uniques = uniques[: nan_idx + 1] |
|
if return_inverse: |
|
inverse[inverse > nan_idx] = nan_idx |
|
|
|
if return_counts: |
|
counts[nan_idx] = xp.sum(counts[nan_idx:]) |
|
counts = counts[: nan_idx + 1] |
|
|
|
ret = (uniques,) |
|
|
|
if return_inverse: |
|
ret += (inverse,) |
|
|
|
if return_counts: |
|
ret += (counts,) |
|
|
|
return ret[0] if len(ret) == 1 else ret |
|
|
|
|
|
class MissingValues(NamedTuple): |
|
"""Data class for missing data information""" |
|
|
|
nan: bool |
|
none: bool |
|
|
|
def to_list(self): |
|
"""Convert tuple to a list where None is always first.""" |
|
output = [] |
|
if self.none: |
|
output.append(None) |
|
if self.nan: |
|
output.append(np.nan) |
|
return output |
|
|
|
|
|
def _extract_missing(values): |
|
"""Extract missing values from `values`. |
|
|
|
Parameters |
|
---------- |
|
values: set |
|
Set of values to extract missing from. |
|
|
|
Returns |
|
------- |
|
output: set |
|
Set with missing values extracted. |
|
|
|
missing_values: MissingValues |
|
Object with missing value information. |
|
""" |
|
missing_values_set = { |
|
value for value in values if value is None or is_scalar_nan(value) |
|
} |
|
|
|
if not missing_values_set: |
|
return values, MissingValues(nan=False, none=False) |
|
|
|
if None in missing_values_set: |
|
if len(missing_values_set) == 1: |
|
output_missing_values = MissingValues(nan=False, none=True) |
|
else: |
|
|
|
|
|
output_missing_values = MissingValues(nan=True, none=True) |
|
else: |
|
output_missing_values = MissingValues(nan=True, none=False) |
|
|
|
|
|
output = values - missing_values_set |
|
return output, output_missing_values |
|
|
|
|
|
class _nandict(dict): |
|
"""Dictionary with support for nans.""" |
|
|
|
def __init__(self, mapping): |
|
super().__init__(mapping) |
|
for key, value in mapping.items(): |
|
if is_scalar_nan(key): |
|
self.nan_value = value |
|
break |
|
|
|
def __missing__(self, key): |
|
if hasattr(self, "nan_value") and is_scalar_nan(key): |
|
return self.nan_value |
|
raise KeyError(key) |
|
|
|
|
|
def _map_to_integer(values, uniques): |
|
"""Map values based on its position in uniques.""" |
|
xp, _ = get_namespace(values, uniques) |
|
table = _nandict({val: i for i, val in enumerate(uniques)}) |
|
return xp.asarray([table[v] for v in values], device=device(values)) |
|
|
|
|
|
def _unique_python(values, *, return_inverse, return_counts): |
|
|
|
try: |
|
uniques_set = set(values) |
|
uniques_set, missing_values = _extract_missing(uniques_set) |
|
|
|
uniques = sorted(uniques_set) |
|
uniques.extend(missing_values.to_list()) |
|
uniques = np.array(uniques, dtype=values.dtype) |
|
except TypeError: |
|
types = sorted(t.__qualname__ for t in set(type(v) for v in values)) |
|
raise TypeError( |
|
"Encoders require their input argument must be uniformly " |
|
f"strings or numbers. Got {types}" |
|
) |
|
ret = (uniques,) |
|
|
|
if return_inverse: |
|
ret += (_map_to_integer(values, uniques),) |
|
|
|
if return_counts: |
|
ret += (_get_counts(values, uniques),) |
|
|
|
return ret[0] if len(ret) == 1 else ret |
|
|
|
|
|
def _encode(values, *, uniques, check_unknown=True): |
|
"""Helper function to encode values into [0, n_uniques - 1]. |
|
|
|
Uses pure python method for object dtype, and numpy method for |
|
all other dtypes. |
|
The numpy method has the limitation that the `uniques` need to |
|
be sorted. Importantly, this is not checked but assumed to already be |
|
the case. The calling method needs to ensure this for all non-object |
|
values. |
|
|
|
Parameters |
|
---------- |
|
values : ndarray |
|
Values to encode. |
|
uniques : ndarray |
|
The unique values in `values`. If the dtype is not object, then |
|
`uniques` needs to be sorted. |
|
check_unknown : bool, default=True |
|
If True, check for values in `values` that are not in `unique` |
|
and raise an error. This is ignored for object dtype, and treated as |
|
True in this case. This parameter is useful for |
|
_BaseEncoder._transform() to avoid calling _check_unknown() |
|
twice. |
|
|
|
Returns |
|
------- |
|
encoded : ndarray |
|
Encoded values |
|
""" |
|
xp, _ = get_namespace(values, uniques) |
|
if not xp.isdtype(values.dtype, "numeric"): |
|
try: |
|
return _map_to_integer(values, uniques) |
|
except KeyError as e: |
|
raise ValueError(f"y contains previously unseen labels: {str(e)}") |
|
else: |
|
if check_unknown: |
|
diff = _check_unknown(values, uniques) |
|
if diff: |
|
raise ValueError(f"y contains previously unseen labels: {str(diff)}") |
|
return _searchsorted(uniques, values, xp=xp) |
|
|
|
|
|
def _check_unknown(values, known_values, return_mask=False): |
|
""" |
|
Helper function to check for unknowns in values to be encoded. |
|
|
|
Uses pure python method for object dtype, and numpy method for |
|
all other dtypes. |
|
|
|
Parameters |
|
---------- |
|
values : array |
|
Values to check for unknowns. |
|
known_values : array |
|
Known values. Must be unique. |
|
return_mask : bool, default=False |
|
If True, return a mask of the same shape as `values` indicating |
|
the valid values. |
|
|
|
Returns |
|
------- |
|
diff : list |
|
The unique values present in `values` and not in `know_values`. |
|
valid_mask : boolean array |
|
Additionally returned if ``return_mask=True``. |
|
|
|
""" |
|
xp, _ = get_namespace(values, known_values) |
|
valid_mask = None |
|
|
|
if not xp.isdtype(values.dtype, "numeric"): |
|
values_set = set(values) |
|
values_set, missing_in_values = _extract_missing(values_set) |
|
|
|
uniques_set = set(known_values) |
|
uniques_set, missing_in_uniques = _extract_missing(uniques_set) |
|
diff = values_set - uniques_set |
|
|
|
nan_in_diff = missing_in_values.nan and not missing_in_uniques.nan |
|
none_in_diff = missing_in_values.none and not missing_in_uniques.none |
|
|
|
def is_valid(value): |
|
return ( |
|
value in uniques_set |
|
or missing_in_uniques.none |
|
and value is None |
|
or missing_in_uniques.nan |
|
and is_scalar_nan(value) |
|
) |
|
|
|
if return_mask: |
|
if diff or nan_in_diff or none_in_diff: |
|
valid_mask = xp.array([is_valid(value) for value in values]) |
|
else: |
|
valid_mask = xp.ones(len(values), dtype=xp.bool) |
|
|
|
diff = list(diff) |
|
if none_in_diff: |
|
diff.append(None) |
|
if nan_in_diff: |
|
diff.append(np.nan) |
|
else: |
|
unique_values = xp.unique_values(values) |
|
diff = _setdiff1d(unique_values, known_values, xp, assume_unique=True) |
|
if return_mask: |
|
if diff.size: |
|
valid_mask = _isin(values, known_values, xp) |
|
else: |
|
valid_mask = xp.ones(len(values), dtype=xp.bool) |
|
|
|
|
|
if xp.any(xp.isnan(known_values)): |
|
diff_is_nan = xp.isnan(diff) |
|
if xp.any(diff_is_nan): |
|
|
|
if diff.size and return_mask: |
|
is_nan = xp.isnan(values) |
|
valid_mask[is_nan] = 1 |
|
|
|
|
|
diff = diff[~diff_is_nan] |
|
diff = list(diff) |
|
|
|
if return_mask: |
|
return diff, valid_mask |
|
return diff |
|
|
|
|
|
class _NaNCounter(Counter): |
|
"""Counter with support for nan values.""" |
|
|
|
def __init__(self, items): |
|
super().__init__(self._generate_items(items)) |
|
|
|
def _generate_items(self, items): |
|
"""Generate items without nans. Stores the nan counts separately.""" |
|
for item in items: |
|
if not is_scalar_nan(item): |
|
yield item |
|
continue |
|
if not hasattr(self, "nan_count"): |
|
self.nan_count = 0 |
|
self.nan_count += 1 |
|
|
|
def __missing__(self, key): |
|
if hasattr(self, "nan_count") and is_scalar_nan(key): |
|
return self.nan_count |
|
raise KeyError(key) |
|
|
|
|
|
def _get_counts(values, uniques): |
|
"""Get the count of each of the `uniques` in `values`. |
|
|
|
The counts will use the order passed in by `uniques`. For non-object dtypes, |
|
`uniques` is assumed to be sorted and `np.nan` is at the end. |
|
""" |
|
if values.dtype.kind in "OU": |
|
counter = _NaNCounter(values) |
|
output = np.zeros(len(uniques), dtype=np.int64) |
|
for i, item in enumerate(uniques): |
|
with suppress(KeyError): |
|
output[i] = counter[item] |
|
return output |
|
|
|
unique_values, counts = _unique_np(values, return_counts=True) |
|
|
|
|
|
uniques_in_values = np.isin(uniques, unique_values, assume_unique=True) |
|
if np.isnan(unique_values[-1]) and np.isnan(uniques[-1]): |
|
uniques_in_values[-1] = True |
|
|
|
unique_valid_indices = np.searchsorted(unique_values, uniques[uniques_in_values]) |
|
output = np.zeros_like(uniques, dtype=np.int64) |
|
output[uniques_in_values] = counts[unique_valid_indices] |
|
return output |
|
|