File size: 15,785 Bytes
7885a28
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
"""
Test the fastica algorithm.
"""

import itertools
import os
import warnings

import numpy as np
import pytest
from scipy import stats

from sklearn.decomposition import PCA, FastICA, fastica
from sklearn.decomposition._fastica import _gs_decorrelation
from sklearn.exceptions import ConvergenceWarning
from sklearn.utils._testing import assert_allclose, ignore_warnings


def center_and_norm(x, axis=-1):
    """Centers and norms x **in place**

    Parameters
    -----------
    x: ndarray
        Array with an axis of observations (statistical units) measured on
        random variables.
    axis: int, optional
        Axis along which the mean and variance are calculated.
    """
    x = np.rollaxis(x, axis)
    x -= x.mean(axis=0)
    x /= x.std(axis=0)


def test_gs():
    # Test gram schmidt orthonormalization
    # generate a random orthogonal  matrix
    rng = np.random.RandomState(0)
    W, _, _ = np.linalg.svd(rng.randn(10, 10))
    w = rng.randn(10)
    _gs_decorrelation(w, W, 10)
    assert (w**2).sum() < 1.0e-10
    w = rng.randn(10)
    u = _gs_decorrelation(w, W, 5)
    tmp = np.dot(u, W.T)
    assert (tmp[:5] ** 2).sum() < 1.0e-10


def test_fastica_attributes_dtypes(global_dtype):
    rng = np.random.RandomState(0)
    X = rng.random_sample((100, 10)).astype(global_dtype, copy=False)
    fica = FastICA(
        n_components=5, max_iter=1000, whiten="unit-variance", random_state=0
    ).fit(X)
    assert fica.components_.dtype == global_dtype
    assert fica.mixing_.dtype == global_dtype
    assert fica.mean_.dtype == global_dtype
    assert fica.whitening_.dtype == global_dtype


def test_fastica_return_dtypes(global_dtype):
    rng = np.random.RandomState(0)
    X = rng.random_sample((100, 10)).astype(global_dtype, copy=False)
    k_, mixing_, s_ = fastica(
        X, max_iter=1000, whiten="unit-variance", random_state=rng
    )
    assert k_.dtype == global_dtype
    assert mixing_.dtype == global_dtype
    assert s_.dtype == global_dtype


@pytest.mark.parametrize("add_noise", [True, False])
def test_fastica_simple(add_noise, global_random_seed, global_dtype):
    if (
        global_random_seed == 20
        and global_dtype == np.float32
        and not add_noise
        and os.getenv("DISTRIB") == "ubuntu"
    ):
        pytest.xfail(
            "FastICA instability with Ubuntu Atlas build with float32 "
            "global_dtype. For more details, see "
            "https://github.com/scikit-learn/scikit-learn/issues/24131#issuecomment-1208091119"  # noqa
        )

    # Test the FastICA algorithm on very simple data.
    rng = np.random.RandomState(global_random_seed)
    n_samples = 1000
    # Generate two sources:
    s1 = (2 * np.sin(np.linspace(0, 100, n_samples)) > 0) - 1
    s2 = stats.t.rvs(1, size=n_samples, random_state=global_random_seed)
    s = np.c_[s1, s2].T
    center_and_norm(s)
    s = s.astype(global_dtype)
    s1, s2 = s

    # Mixing angle
    phi = 0.6
    mixing = np.array([[np.cos(phi), np.sin(phi)], [np.sin(phi), -np.cos(phi)]])
    mixing = mixing.astype(global_dtype)
    m = np.dot(mixing, s)

    if add_noise:
        m += 0.1 * rng.randn(2, 1000)

    center_and_norm(m)

    # function as fun arg
    def g_test(x):
        return x**3, (3 * x**2).mean(axis=-1)

    algos = ["parallel", "deflation"]
    nls = ["logcosh", "exp", "cube", g_test]
    whitening = ["arbitrary-variance", "unit-variance", False]
    for algo, nl, whiten in itertools.product(algos, nls, whitening):
        if whiten:
            k_, mixing_, s_ = fastica(
                m.T, fun=nl, whiten=whiten, algorithm=algo, random_state=rng
            )
            with pytest.raises(ValueError):
                fastica(m.T, fun=np.tanh, whiten=whiten, algorithm=algo)
        else:
            pca = PCA(n_components=2, whiten=True, random_state=rng)
            X = pca.fit_transform(m.T)
            k_, mixing_, s_ = fastica(
                X, fun=nl, algorithm=algo, whiten=False, random_state=rng
            )
            with pytest.raises(ValueError):
                fastica(X, fun=np.tanh, algorithm=algo)
        s_ = s_.T
        # Check that the mixing model described in the docstring holds:
        if whiten:
            # XXX: exact reconstruction to standard relative tolerance is not
            # possible. This is probably expected when add_noise is True but we
            # also need a non-trivial atol in float32 when add_noise is False.
            #
            # Note that the 2 sources are non-Gaussian in this test.
            atol = 1e-5 if global_dtype == np.float32 else 0
            assert_allclose(np.dot(np.dot(mixing_, k_), m), s_, atol=atol)

        center_and_norm(s_)
        s1_, s2_ = s_
        # Check to see if the sources have been estimated
        # in the wrong order
        if abs(np.dot(s1_, s2)) > abs(np.dot(s1_, s1)):
            s2_, s1_ = s_
        s1_ *= np.sign(np.dot(s1_, s1))
        s2_ *= np.sign(np.dot(s2_, s2))

        # Check that we have estimated the original sources
        if not add_noise:
            assert_allclose(np.dot(s1_, s1) / n_samples, 1, atol=1e-2)
            assert_allclose(np.dot(s2_, s2) / n_samples, 1, atol=1e-2)
        else:
            assert_allclose(np.dot(s1_, s1) / n_samples, 1, atol=1e-1)
            assert_allclose(np.dot(s2_, s2) / n_samples, 1, atol=1e-1)

    # Test FastICA class
    _, _, sources_fun = fastica(
        m.T, fun=nl, algorithm=algo, random_state=global_random_seed
    )
    ica = FastICA(fun=nl, algorithm=algo, random_state=global_random_seed)
    sources = ica.fit_transform(m.T)
    assert ica.components_.shape == (2, 2)
    assert sources.shape == (1000, 2)

    assert_allclose(sources_fun, sources)
    # Set atol to account for the different magnitudes of the elements in sources
    # (from 1e-4 to 1e1).
    atol = np.max(np.abs(sources)) * (1e-5 if global_dtype == np.float32 else 1e-7)
    assert_allclose(sources, ica.transform(m.T), atol=atol)

    assert ica.mixing_.shape == (2, 2)

    ica = FastICA(fun=np.tanh, algorithm=algo)
    with pytest.raises(ValueError):
        ica.fit(m.T)


def test_fastica_nowhiten():
    m = [[0, 1], [1, 0]]

    # test for issue #697
    ica = FastICA(n_components=1, whiten=False, random_state=0)
    warn_msg = "Ignoring n_components with whiten=False."
    with pytest.warns(UserWarning, match=warn_msg):
        ica.fit(m)
    assert hasattr(ica, "mixing_")


def test_fastica_convergence_fail():
    # Test the FastICA algorithm on very simple data
    # (see test_non_square_fastica).
    # Ensure a ConvergenceWarning raised if the tolerance is sufficiently low.
    rng = np.random.RandomState(0)

    n_samples = 1000
    # Generate two sources:
    t = np.linspace(0, 100, n_samples)
    s1 = np.sin(t)
    s2 = np.ceil(np.sin(np.pi * t))
    s = np.c_[s1, s2].T
    center_and_norm(s)

    # Mixing matrix
    mixing = rng.randn(6, 2)
    m = np.dot(mixing, s)

    # Do fastICA with tolerance 0. to ensure failing convergence
    warn_msg = (
        "FastICA did not converge. Consider increasing tolerance "
        "or the maximum number of iterations."
    )
    with pytest.warns(ConvergenceWarning, match=warn_msg):
        ica = FastICA(
            algorithm="parallel", n_components=2, random_state=rng, max_iter=2, tol=0.0
        )
        ica.fit(m.T)


@pytest.mark.parametrize("add_noise", [True, False])
def test_non_square_fastica(add_noise):
    # Test the FastICA algorithm on very simple data.
    rng = np.random.RandomState(0)

    n_samples = 1000
    # Generate two sources:
    t = np.linspace(0, 100, n_samples)
    s1 = np.sin(t)
    s2 = np.ceil(np.sin(np.pi * t))
    s = np.c_[s1, s2].T
    center_and_norm(s)
    s1, s2 = s

    # Mixing matrix
    mixing = rng.randn(6, 2)
    m = np.dot(mixing, s)

    if add_noise:
        m += 0.1 * rng.randn(6, n_samples)

    center_and_norm(m)

    k_, mixing_, s_ = fastica(
        m.T, n_components=2, whiten="unit-variance", random_state=rng
    )
    s_ = s_.T

    # Check that the mixing model described in the docstring holds:
    assert_allclose(s_, np.dot(np.dot(mixing_, k_), m))

    center_and_norm(s_)
    s1_, s2_ = s_
    # Check to see if the sources have been estimated
    # in the wrong order
    if abs(np.dot(s1_, s2)) > abs(np.dot(s1_, s1)):
        s2_, s1_ = s_
    s1_ *= np.sign(np.dot(s1_, s1))
    s2_ *= np.sign(np.dot(s2_, s2))

    # Check that we have estimated the original sources
    if not add_noise:
        assert_allclose(np.dot(s1_, s1) / n_samples, 1, atol=1e-3)
        assert_allclose(np.dot(s2_, s2) / n_samples, 1, atol=1e-3)


def test_fit_transform(global_random_seed, global_dtype):
    """Test unit variance of transformed data using FastICA algorithm.

    Check that `fit_transform` gives the same result as applying
    `fit` and then `transform`.

    Bug #13056
    """
    # multivariate uniform data in [0, 1]
    rng = np.random.RandomState(global_random_seed)
    X = rng.random_sample((100, 10)).astype(global_dtype)
    max_iter = 300
    for whiten, n_components in [["unit-variance", 5], [False, None]]:
        n_components_ = n_components if n_components is not None else X.shape[1]

        ica = FastICA(
            n_components=n_components, max_iter=max_iter, whiten=whiten, random_state=0
        )
        with warnings.catch_warnings():
            # make sure that numerical errors do not cause sqrt of negative
            # values
            warnings.simplefilter("error", RuntimeWarning)
            # XXX: for some seeds, the model does not converge.
            # However this is not what we test here.
            warnings.simplefilter("ignore", ConvergenceWarning)
            Xt = ica.fit_transform(X)
        assert ica.components_.shape == (n_components_, 10)
        assert Xt.shape == (X.shape[0], n_components_)

        ica2 = FastICA(
            n_components=n_components, max_iter=max_iter, whiten=whiten, random_state=0
        )
        with warnings.catch_warnings():
            # make sure that numerical errors do not cause sqrt of negative
            # values
            warnings.simplefilter("error", RuntimeWarning)
            warnings.simplefilter("ignore", ConvergenceWarning)
            ica2.fit(X)
        assert ica2.components_.shape == (n_components_, 10)
        Xt2 = ica2.transform(X)

        # XXX: we have to set atol for this test to pass for all seeds when
        # fitting with float32 data. Is this revealing a bug?
        if global_dtype:
            atol = np.abs(Xt2).mean() / 1e6
        else:
            atol = 0.0  # the default rtol is enough for float64 data
        assert_allclose(Xt, Xt2, atol=atol)


@pytest.mark.filterwarnings("ignore:Ignoring n_components with whiten=False.")
@pytest.mark.parametrize(
    "whiten, n_components, expected_mixing_shape",
    [
        ("arbitrary-variance", 5, (10, 5)),
        ("arbitrary-variance", 10, (10, 10)),
        ("unit-variance", 5, (10, 5)),
        ("unit-variance", 10, (10, 10)),
        (False, 5, (10, 10)),
        (False, 10, (10, 10)),
    ],
)
def test_inverse_transform(
    whiten, n_components, expected_mixing_shape, global_random_seed, global_dtype
):
    # Test FastICA.inverse_transform
    n_samples = 100
    rng = np.random.RandomState(global_random_seed)
    X = rng.random_sample((n_samples, 10)).astype(global_dtype)

    ica = FastICA(n_components=n_components, random_state=rng, whiten=whiten)
    with warnings.catch_warnings():
        # For some dataset (depending on the value of global_dtype) the model
        # can fail to converge but this should not impact the definition of
        # a valid inverse transform.
        warnings.simplefilter("ignore", ConvergenceWarning)
        Xt = ica.fit_transform(X)
    assert ica.mixing_.shape == expected_mixing_shape
    X2 = ica.inverse_transform(Xt)
    assert X.shape == X2.shape

    # reversibility test in non-reduction case
    if n_components == X.shape[1]:
        # XXX: we have to set atol for this test to pass for all seeds when
        # fitting with float32 data. Is this revealing a bug?
        if global_dtype:
            # XXX: dividing by a smaller number makes
            # tests fail for some seeds.
            atol = np.abs(X2).mean() / 1e5
        else:
            atol = 0.0  # the default rtol is enough for float64 data
        assert_allclose(X, X2, atol=atol)


def test_fastica_errors():
    n_features = 3
    n_samples = 10
    rng = np.random.RandomState(0)
    X = rng.random_sample((n_samples, n_features))
    w_init = rng.randn(n_features + 1, n_features + 1)
    with pytest.raises(ValueError, match=r"alpha must be in \[1,2\]"):
        fastica(X, fun_args={"alpha": 0})
    with pytest.raises(
        ValueError, match="w_init has invalid shape.+" r"should be \(3L?, 3L?\)"
    ):
        fastica(X, w_init=w_init)


def test_fastica_whiten_unit_variance():
    """Test unit variance of transformed data using FastICA algorithm.

    Bug #13056
    """
    rng = np.random.RandomState(0)
    X = rng.random_sample((100, 10))
    n_components = X.shape[1]
    ica = FastICA(n_components=n_components, whiten="unit-variance", random_state=0)
    Xt = ica.fit_transform(X)

    assert np.var(Xt) == pytest.approx(1.0)


@pytest.mark.parametrize("whiten", ["arbitrary-variance", "unit-variance", False])
@pytest.mark.parametrize("return_X_mean", [True, False])
@pytest.mark.parametrize("return_n_iter", [True, False])
def test_fastica_output_shape(whiten, return_X_mean, return_n_iter):
    n_features = 3
    n_samples = 10
    rng = np.random.RandomState(0)
    X = rng.random_sample((n_samples, n_features))

    expected_len = 3 + return_X_mean + return_n_iter

    out = fastica(
        X, whiten=whiten, return_n_iter=return_n_iter, return_X_mean=return_X_mean
    )

    assert len(out) == expected_len
    if not whiten:
        assert out[0] is None


@pytest.mark.parametrize("add_noise", [True, False])
def test_fastica_simple_different_solvers(add_noise, global_random_seed):
    """Test FastICA is consistent between whiten_solvers."""
    rng = np.random.RandomState(global_random_seed)
    n_samples = 1000
    # Generate two sources:
    s1 = (2 * np.sin(np.linspace(0, 100, n_samples)) > 0) - 1
    s2 = stats.t.rvs(1, size=n_samples, random_state=rng)
    s = np.c_[s1, s2].T
    center_and_norm(s)
    s1, s2 = s

    # Mixing angle
    phi = rng.rand() * 2 * np.pi
    mixing = np.array([[np.cos(phi), np.sin(phi)], [np.sin(phi), -np.cos(phi)]])
    m = np.dot(mixing, s)

    if add_noise:
        m += 0.1 * rng.randn(2, 1000)

    center_and_norm(m)

    outs = {}
    for solver in ("svd", "eigh"):
        ica = FastICA(random_state=0, whiten="unit-variance", whiten_solver=solver)
        sources = ica.fit_transform(m.T)
        outs[solver] = sources
        assert ica.components_.shape == (2, 2)
        assert sources.shape == (1000, 2)

    # compared numbers are not all on the same magnitude. Using a small atol to
    # make the test less brittle
    assert_allclose(outs["eigh"], outs["svd"], atol=1e-12)


def test_fastica_eigh_low_rank_warning(global_random_seed):
    """Test FastICA eigh solver raises warning for low-rank data."""
    rng = np.random.RandomState(global_random_seed)
    A = rng.randn(10, 2)
    X = A @ A.T
    ica = FastICA(random_state=0, whiten="unit-variance", whiten_solver="eigh")
    msg = "There are some small singular values"

    with pytest.warns(UserWarning, match=msg):
        with ignore_warnings(category=ConvergenceWarning):
            # The FastICA solver may not converge for some data with specific
            # random seeds but this happens after the whiten step so this is
            # not want we want to test here.
            ica.fit(X)