File size: 3,367 Bytes
7885a28
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import numpy as np
import pytest

from sklearn.impute._base import _BaseImputer
from sklearn.impute._iterative import _assign_where
from sklearn.utils._mask import _get_mask
from sklearn.utils._testing import _convert_container, assert_allclose


@pytest.fixture
def data():
    X = np.random.randn(10, 2)
    X[::2] = np.nan
    return X


class NoFitIndicatorImputer(_BaseImputer):
    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        return self._concatenate_indicator(X, self._transform_indicator(X))


class NoTransformIndicatorImputer(_BaseImputer):
    def fit(self, X, y=None):
        mask = _get_mask(X, value_to_mask=np.nan)
        super()._fit_indicator(mask)
        return self

    def transform(self, X, y=None):
        return self._concatenate_indicator(X, None)


class NoPrecomputedMaskFit(_BaseImputer):
    def fit(self, X, y=None):
        self._fit_indicator(X)
        return self

    def transform(self, X):
        return self._concatenate_indicator(X, self._transform_indicator(X))


class NoPrecomputedMaskTransform(_BaseImputer):
    def fit(self, X, y=None):
        mask = _get_mask(X, value_to_mask=np.nan)
        self._fit_indicator(mask)
        return self

    def transform(self, X):
        return self._concatenate_indicator(X, self._transform_indicator(X))


def test_base_imputer_not_fit(data):
    imputer = NoFitIndicatorImputer(add_indicator=True)
    err_msg = "Make sure to call _fit_indicator before _transform_indicator"
    with pytest.raises(ValueError, match=err_msg):
        imputer.fit(data).transform(data)
    with pytest.raises(ValueError, match=err_msg):
        imputer.fit_transform(data)


def test_base_imputer_not_transform(data):
    imputer = NoTransformIndicatorImputer(add_indicator=True)
    err_msg = (
        "Call _fit_indicator and _transform_indicator in the imputer implementation"
    )
    with pytest.raises(ValueError, match=err_msg):
        imputer.fit(data).transform(data)
    with pytest.raises(ValueError, match=err_msg):
        imputer.fit_transform(data)


def test_base_no_precomputed_mask_fit(data):
    imputer = NoPrecomputedMaskFit(add_indicator=True)
    err_msg = "precomputed is True but the input data is not a mask"
    with pytest.raises(ValueError, match=err_msg):
        imputer.fit(data)
    with pytest.raises(ValueError, match=err_msg):
        imputer.fit_transform(data)


def test_base_no_precomputed_mask_transform(data):
    imputer = NoPrecomputedMaskTransform(add_indicator=True)
    err_msg = "precomputed is True but the input data is not a mask"
    imputer.fit(data)
    with pytest.raises(ValueError, match=err_msg):
        imputer.transform(data)
    with pytest.raises(ValueError, match=err_msg):
        imputer.fit_transform(data)


@pytest.mark.parametrize("X1_type", ["array", "dataframe"])
def test_assign_where(X1_type):
    """Check the behaviour of the private helpers `_assign_where`."""
    rng = np.random.RandomState(0)

    n_samples, n_features = 10, 5
    X1 = _convert_container(rng.randn(n_samples, n_features), constructor_name=X1_type)
    X2 = rng.randn(n_samples, n_features)
    mask = rng.randint(0, 2, size=(n_samples, n_features)).astype(bool)

    _assign_where(X1, X2, mask)

    if X1_type == "dataframe":
        X1 = X1.to_numpy()
    assert_allclose(X1[mask], X2[mask])