|
"""Isotonic regression for obtaining monotonic fit to data.""" |
|
|
|
|
|
|
|
|
|
import math |
|
import warnings |
|
from numbers import Real |
|
|
|
import numpy as np |
|
from scipy import interpolate, optimize |
|
from scipy.stats import spearmanr |
|
|
|
from sklearn.utils import metadata_routing |
|
|
|
from ._isotonic import _inplace_contiguous_isotonic_regression, _make_unique |
|
from .base import BaseEstimator, RegressorMixin, TransformerMixin, _fit_context |
|
from .utils import check_array, check_consistent_length |
|
from .utils._param_validation import Interval, StrOptions, validate_params |
|
from .utils.fixes import parse_version, sp_base_version |
|
from .utils.validation import _check_sample_weight, check_is_fitted |
|
|
|
__all__ = ["check_increasing", "isotonic_regression", "IsotonicRegression"] |
|
|
|
|
|
@validate_params( |
|
{ |
|
"x": ["array-like"], |
|
"y": ["array-like"], |
|
}, |
|
prefer_skip_nested_validation=True, |
|
) |
|
def check_increasing(x, y): |
|
"""Determine whether y is monotonically correlated with x. |
|
|
|
y is found increasing or decreasing with respect to x based on a Spearman |
|
correlation test. |
|
|
|
Parameters |
|
---------- |
|
x : array-like of shape (n_samples,) |
|
Training data. |
|
|
|
y : array-like of shape (n_samples,) |
|
Training target. |
|
|
|
Returns |
|
------- |
|
increasing_bool : boolean |
|
Whether the relationship is increasing or decreasing. |
|
|
|
Notes |
|
----- |
|
The Spearman correlation coefficient is estimated from the data, and the |
|
sign of the resulting estimate is used as the result. |
|
|
|
In the event that the 95% confidence interval based on Fisher transform |
|
spans zero, a warning is raised. |
|
|
|
References |
|
---------- |
|
Fisher transformation. Wikipedia. |
|
https://en.wikipedia.org/wiki/Fisher_transformation |
|
|
|
Examples |
|
-------- |
|
>>> from sklearn.isotonic import check_increasing |
|
>>> x, y = [1, 2, 3, 4, 5], [2, 4, 6, 8, 10] |
|
>>> check_increasing(x, y) |
|
np.True_ |
|
>>> y = [10, 8, 6, 4, 2] |
|
>>> check_increasing(x, y) |
|
np.False_ |
|
""" |
|
|
|
|
|
rho, _ = spearmanr(x, y) |
|
increasing_bool = rho >= 0 |
|
|
|
|
|
if rho not in [-1.0, 1.0] and len(x) > 3: |
|
F = 0.5 * math.log((1.0 + rho) / (1.0 - rho)) |
|
F_se = 1 / math.sqrt(len(x) - 3) |
|
|
|
|
|
|
|
rho_0 = math.tanh(F - 1.96 * F_se) |
|
rho_1 = math.tanh(F + 1.96 * F_se) |
|
|
|
|
|
if np.sign(rho_0) != np.sign(rho_1): |
|
warnings.warn( |
|
"Confidence interval of the Spearman " |
|
"correlation coefficient spans zero. " |
|
"Determination of ``increasing`` may be " |
|
"suspect." |
|
) |
|
|
|
return increasing_bool |
|
|
|
|
|
@validate_params( |
|
{ |
|
"y": ["array-like"], |
|
"sample_weight": ["array-like", None], |
|
"y_min": [Interval(Real, None, None, closed="both"), None], |
|
"y_max": [Interval(Real, None, None, closed="both"), None], |
|
"increasing": ["boolean"], |
|
}, |
|
prefer_skip_nested_validation=True, |
|
) |
|
def isotonic_regression( |
|
y, *, sample_weight=None, y_min=None, y_max=None, increasing=True |
|
): |
|
"""Solve the isotonic regression model. |
|
|
|
Read more in the :ref:`User Guide <isotonic>`. |
|
|
|
Parameters |
|
---------- |
|
y : array-like of shape (n_samples,) |
|
The data. |
|
|
|
sample_weight : array-like of shape (n_samples,), default=None |
|
Weights on each point of the regression. |
|
If None, weight is set to 1 (equal weights). |
|
|
|
y_min : float, default=None |
|
Lower bound on the lowest predicted value (the minimum value may |
|
still be higher). If not set, defaults to -inf. |
|
|
|
y_max : float, default=None |
|
Upper bound on the highest predicted value (the maximum may still be |
|
lower). If not set, defaults to +inf. |
|
|
|
increasing : bool, default=True |
|
Whether to compute ``y_`` is increasing (if set to True) or decreasing |
|
(if set to False). |
|
|
|
Returns |
|
------- |
|
y_ : ndarray of shape (n_samples,) |
|
Isotonic fit of y. |
|
|
|
References |
|
---------- |
|
"Active set algorithms for isotonic regression; A unifying framework" |
|
by Michael J. Best and Nilotpal Chakravarti, section 3. |
|
|
|
Examples |
|
-------- |
|
>>> from sklearn.isotonic import isotonic_regression |
|
>>> isotonic_regression([5, 3, 1, 2, 8, 10, 7, 9, 6, 4]) |
|
array([2.75 , 2.75 , 2.75 , 2.75 , 7.33..., |
|
7.33..., 7.33..., 7.33..., 7.33..., 7.33...]) |
|
""" |
|
y = check_array(y, ensure_2d=False, input_name="y", dtype=[np.float64, np.float32]) |
|
if sp_base_version >= parse_version("1.12.0"): |
|
res = optimize.isotonic_regression( |
|
y=y, weights=sample_weight, increasing=increasing |
|
) |
|
y = np.asarray(res.x, dtype=y.dtype) |
|
else: |
|
|
|
|
|
order = np.s_[:] if increasing else np.s_[::-1] |
|
y = np.array(y[order], dtype=y.dtype) |
|
sample_weight = _check_sample_weight(sample_weight, y, dtype=y.dtype, copy=True) |
|
sample_weight = np.ascontiguousarray(sample_weight[order]) |
|
_inplace_contiguous_isotonic_regression(y, sample_weight) |
|
y = y[order] |
|
|
|
if y_min is not None or y_max is not None: |
|
|
|
if y_min is None: |
|
y_min = -np.inf |
|
if y_max is None: |
|
y_max = np.inf |
|
np.clip(y, y_min, y_max, y) |
|
return y |
|
|
|
|
|
class IsotonicRegression(RegressorMixin, TransformerMixin, BaseEstimator): |
|
"""Isotonic regression model. |
|
|
|
Read more in the :ref:`User Guide <isotonic>`. |
|
|
|
.. versionadded:: 0.13 |
|
|
|
Parameters |
|
---------- |
|
y_min : float, default=None |
|
Lower bound on the lowest predicted value (the minimum value may |
|
still be higher). If not set, defaults to -inf. |
|
|
|
y_max : float, default=None |
|
Upper bound on the highest predicted value (the maximum may still be |
|
lower). If not set, defaults to +inf. |
|
|
|
increasing : bool or 'auto', default=True |
|
Determines whether the predictions should be constrained to increase |
|
or decrease with `X`. 'auto' will decide based on the Spearman |
|
correlation estimate's sign. |
|
|
|
out_of_bounds : {'nan', 'clip', 'raise'}, default='nan' |
|
Handles how `X` values outside of the training domain are handled |
|
during prediction. |
|
|
|
- 'nan', predictions will be NaN. |
|
- 'clip', predictions will be set to the value corresponding to |
|
the nearest train interval endpoint. |
|
- 'raise', a `ValueError` is raised. |
|
|
|
Attributes |
|
---------- |
|
X_min_ : float |
|
Minimum value of input array `X_` for left bound. |
|
|
|
X_max_ : float |
|
Maximum value of input array `X_` for right bound. |
|
|
|
X_thresholds_ : ndarray of shape (n_thresholds,) |
|
Unique ascending `X` values used to interpolate |
|
the y = f(X) monotonic function. |
|
|
|
.. versionadded:: 0.24 |
|
|
|
y_thresholds_ : ndarray of shape (n_thresholds,) |
|
De-duplicated `y` values suitable to interpolate the y = f(X) |
|
monotonic function. |
|
|
|
.. versionadded:: 0.24 |
|
|
|
f_ : function |
|
The stepwise interpolating function that covers the input domain ``X``. |
|
|
|
increasing_ : bool |
|
Inferred value for ``increasing``. |
|
|
|
See Also |
|
-------- |
|
sklearn.linear_model.LinearRegression : Ordinary least squares Linear |
|
Regression. |
|
sklearn.ensemble.HistGradientBoostingRegressor : Gradient boosting that |
|
is a non-parametric model accepting monotonicity constraints. |
|
isotonic_regression : Function to solve the isotonic regression model. |
|
|
|
Notes |
|
----- |
|
Ties are broken using the secondary method from de Leeuw, 1977. |
|
|
|
References |
|
---------- |
|
Isotonic Median Regression: A Linear Programming Approach |
|
Nilotpal Chakravarti |
|
Mathematics of Operations Research |
|
Vol. 14, No. 2 (May, 1989), pp. 303-308 |
|
|
|
Isotone Optimization in R : Pool-Adjacent-Violators |
|
Algorithm (PAVA) and Active Set Methods |
|
de Leeuw, Hornik, Mair |
|
Journal of Statistical Software 2009 |
|
|
|
Correctness of Kruskal's algorithms for monotone regression with ties |
|
de Leeuw, Psychometrica, 1977 |
|
|
|
Examples |
|
-------- |
|
>>> from sklearn.datasets import make_regression |
|
>>> from sklearn.isotonic import IsotonicRegression |
|
>>> X, y = make_regression(n_samples=10, n_features=1, random_state=41) |
|
>>> iso_reg = IsotonicRegression().fit(X, y) |
|
>>> iso_reg.predict([.1, .2]) |
|
array([1.8628..., 3.7256...]) |
|
""" |
|
|
|
|
|
__metadata_request__predict = {"T": metadata_routing.UNUSED} |
|
__metadata_request__transform = {"T": metadata_routing.UNUSED} |
|
|
|
_parameter_constraints: dict = { |
|
"y_min": [Interval(Real, None, None, closed="both"), None], |
|
"y_max": [Interval(Real, None, None, closed="both"), None], |
|
"increasing": ["boolean", StrOptions({"auto"})], |
|
"out_of_bounds": [StrOptions({"nan", "clip", "raise"})], |
|
} |
|
|
|
def __init__(self, *, y_min=None, y_max=None, increasing=True, out_of_bounds="nan"): |
|
self.y_min = y_min |
|
self.y_max = y_max |
|
self.increasing = increasing |
|
self.out_of_bounds = out_of_bounds |
|
|
|
def _check_input_data_shape(self, X): |
|
if not (X.ndim == 1 or (X.ndim == 2 and X.shape[1] == 1)): |
|
msg = ( |
|
"Isotonic regression input X should be a 1d array or " |
|
"2d array with 1 feature" |
|
) |
|
raise ValueError(msg) |
|
|
|
def _build_f(self, X, y): |
|
"""Build the f_ interp1d function.""" |
|
|
|
bounds_error = self.out_of_bounds == "raise" |
|
if len(y) == 1: |
|
|
|
self.f_ = lambda x: y.repeat(x.shape) |
|
else: |
|
self.f_ = interpolate.interp1d( |
|
X, y, kind="linear", bounds_error=bounds_error |
|
) |
|
|
|
def _build_y(self, X, y, sample_weight, trim_duplicates=True): |
|
"""Build the y_ IsotonicRegression.""" |
|
self._check_input_data_shape(X) |
|
X = X.reshape(-1) |
|
|
|
|
|
if self.increasing == "auto": |
|
self.increasing_ = check_increasing(X, y) |
|
else: |
|
self.increasing_ = self.increasing |
|
|
|
|
|
|
|
sample_weight = _check_sample_weight(sample_weight, X, dtype=X.dtype) |
|
mask = sample_weight > 0 |
|
X, y, sample_weight = X[mask], y[mask], sample_weight[mask] |
|
|
|
order = np.lexsort((y, X)) |
|
X, y, sample_weight = [array[order] for array in [X, y, sample_weight]] |
|
unique_X, unique_y, unique_sample_weight = _make_unique(X, y, sample_weight) |
|
|
|
X = unique_X |
|
y = isotonic_regression( |
|
unique_y, |
|
sample_weight=unique_sample_weight, |
|
y_min=self.y_min, |
|
y_max=self.y_max, |
|
increasing=self.increasing_, |
|
) |
|
|
|
|
|
self.X_min_, self.X_max_ = np.min(X), np.max(X) |
|
|
|
if trim_duplicates: |
|
|
|
keep_data = np.ones((len(y),), dtype=bool) |
|
|
|
|
|
keep_data[1:-1] = np.logical_or( |
|
np.not_equal(y[1:-1], y[:-2]), np.not_equal(y[1:-1], y[2:]) |
|
) |
|
return X[keep_data], y[keep_data] |
|
else: |
|
|
|
|
|
|
|
|
|
return X, y |
|
|
|
@_fit_context(prefer_skip_nested_validation=True) |
|
def fit(self, X, y, sample_weight=None): |
|
"""Fit the model using X, y as training data. |
|
|
|
Parameters |
|
---------- |
|
X : array-like of shape (n_samples,) or (n_samples, 1) |
|
Training data. |
|
|
|
.. versionchanged:: 0.24 |
|
Also accepts 2d array with 1 feature. |
|
|
|
y : array-like of shape (n_samples,) |
|
Training target. |
|
|
|
sample_weight : array-like of shape (n_samples,), default=None |
|
Weights. If set to None, all weights will be set to 1 (equal |
|
weights). |
|
|
|
Returns |
|
------- |
|
self : object |
|
Returns an instance of self. |
|
|
|
Notes |
|
----- |
|
X is stored for future use, as :meth:`transform` needs X to interpolate |
|
new input data. |
|
""" |
|
check_params = dict(accept_sparse=False, ensure_2d=False) |
|
X = check_array( |
|
X, input_name="X", dtype=[np.float64, np.float32], **check_params |
|
) |
|
y = check_array(y, input_name="y", dtype=X.dtype, **check_params) |
|
check_consistent_length(X, y, sample_weight) |
|
|
|
|
|
|
|
X, y = self._build_y(X, y, sample_weight) |
|
|
|
|
|
|
|
|
|
|
|
self.X_thresholds_, self.y_thresholds_ = X, y |
|
|
|
|
|
self._build_f(X, y) |
|
return self |
|
|
|
def _transform(self, T): |
|
"""`_transform` is called by both `transform` and `predict` methods. |
|
|
|
Since `transform` is wrapped to output arrays of specific types (e.g. |
|
NumPy arrays, pandas DataFrame), we cannot make `predict` call `transform` |
|
directly. |
|
|
|
The above behaviour could be changed in the future, if we decide to output |
|
other type of arrays when calling `predict`. |
|
""" |
|
if hasattr(self, "X_thresholds_"): |
|
dtype = self.X_thresholds_.dtype |
|
else: |
|
dtype = np.float64 |
|
|
|
T = check_array(T, dtype=dtype, ensure_2d=False) |
|
|
|
self._check_input_data_shape(T) |
|
T = T.reshape(-1) |
|
|
|
if self.out_of_bounds == "clip": |
|
T = np.clip(T, self.X_min_, self.X_max_) |
|
|
|
res = self.f_(T) |
|
|
|
|
|
res = res.astype(T.dtype) |
|
|
|
return res |
|
|
|
def transform(self, T): |
|
"""Transform new data by linear interpolation. |
|
|
|
Parameters |
|
---------- |
|
T : array-like of shape (n_samples,) or (n_samples, 1) |
|
Data to transform. |
|
|
|
.. versionchanged:: 0.24 |
|
Also accepts 2d array with 1 feature. |
|
|
|
Returns |
|
------- |
|
y_pred : ndarray of shape (n_samples,) |
|
The transformed data. |
|
""" |
|
return self._transform(T) |
|
|
|
def predict(self, T): |
|
"""Predict new data by linear interpolation. |
|
|
|
Parameters |
|
---------- |
|
T : array-like of shape (n_samples,) or (n_samples, 1) |
|
Data to transform. |
|
|
|
Returns |
|
------- |
|
y_pred : ndarray of shape (n_samples,) |
|
Transformed data. |
|
""" |
|
return self._transform(T) |
|
|
|
|
|
|
|
|
|
|
|
def get_feature_names_out(self, input_features=None): |
|
"""Get output feature names for transformation. |
|
|
|
Parameters |
|
---------- |
|
input_features : array-like of str or None, default=None |
|
Ignored. |
|
|
|
Returns |
|
------- |
|
feature_names_out : ndarray of str objects |
|
An ndarray with one string i.e. ["isotonicregression0"]. |
|
""" |
|
check_is_fitted(self, "f_") |
|
class_name = self.__class__.__name__.lower() |
|
return np.asarray([f"{class_name}0"], dtype=object) |
|
|
|
def __getstate__(self): |
|
"""Pickle-protocol - return state of the estimator.""" |
|
state = super().__getstate__() |
|
|
|
state.pop("f_", None) |
|
return state |
|
|
|
def __setstate__(self, state): |
|
"""Pickle-protocol - set state of the estimator. |
|
|
|
We need to rebuild the interpolation function. |
|
""" |
|
super().__setstate__(state) |
|
if hasattr(self, "X_thresholds_") and hasattr(self, "y_thresholds_"): |
|
self._build_f(self.X_thresholds_, self.y_thresholds_) |
|
|
|
def __sklearn_tags__(self): |
|
tags = super().__sklearn_tags__() |
|
tags.input_tags.one_d_array = True |
|
tags.input_tags.two_d_array = False |
|
return tags |
|
|