|
import re |
|
import warnings |
|
|
|
import numpy as np |
|
import pytest |
|
import scipy as sp |
|
from numpy.testing import assert_array_equal |
|
|
|
from sklearn import config_context, datasets |
|
from sklearn.base import clone |
|
from sklearn.datasets import load_iris, make_classification, make_low_rank_matrix |
|
from sklearn.decomposition import PCA |
|
from sklearn.decomposition._pca import _assess_dimension, _infer_dimension |
|
from sklearn.utils._array_api import ( |
|
_atol_for_type, |
|
_convert_to_numpy, |
|
yield_namespace_device_dtype_combinations, |
|
) |
|
from sklearn.utils._array_api import device as array_device |
|
from sklearn.utils._test_common.instance_generator import _get_check_estimator_ids |
|
from sklearn.utils._testing import _array_api_for_tests, assert_allclose |
|
from sklearn.utils.estimator_checks import ( |
|
check_array_api_input_and_values, |
|
) |
|
from sklearn.utils.fixes import CSC_CONTAINERS, CSR_CONTAINERS |
|
|
|
iris = datasets.load_iris() |
|
PCA_SOLVERS = ["full", "covariance_eigh", "arpack", "randomized", "auto"] |
|
|
|
|
|
|
|
|
|
SPARSE_M, SPARSE_N = 1000, 300 |
|
SPARSE_MAX_COMPONENTS = min(SPARSE_M, SPARSE_N) |
|
|
|
|
|
def _check_fitted_pca_close(pca1, pca2, rtol=1e-7, atol=1e-12): |
|
assert_allclose(pca1.components_, pca2.components_, rtol=rtol, atol=atol) |
|
assert_allclose( |
|
pca1.explained_variance_, pca2.explained_variance_, rtol=rtol, atol=atol |
|
) |
|
assert_allclose(pca1.singular_values_, pca2.singular_values_, rtol=rtol, atol=atol) |
|
assert_allclose(pca1.mean_, pca2.mean_, rtol=rtol, atol=atol) |
|
assert_allclose(pca1.noise_variance_, pca2.noise_variance_, rtol=rtol, atol=atol) |
|
|
|
assert pca1.n_components_ == pca2.n_components_ |
|
assert pca1.n_samples_ == pca2.n_samples_ |
|
assert pca1.n_features_in_ == pca2.n_features_in_ |
|
|
|
|
|
@pytest.mark.parametrize("svd_solver", PCA_SOLVERS) |
|
@pytest.mark.parametrize("n_components", range(1, iris.data.shape[1])) |
|
def test_pca(svd_solver, n_components): |
|
X = iris.data |
|
pca = PCA(n_components=n_components, svd_solver=svd_solver) |
|
|
|
|
|
X_r = pca.fit(X).transform(X) |
|
assert X_r.shape[1] == n_components |
|
|
|
|
|
X_r2 = pca.fit_transform(X) |
|
assert_allclose(X_r, X_r2) |
|
X_r = pca.transform(X) |
|
assert_allclose(X_r, X_r2) |
|
|
|
|
|
cov = pca.get_covariance() |
|
precision = pca.get_precision() |
|
assert_allclose(np.dot(cov, precision), np.eye(X.shape[1]), atol=1e-12) |
|
|
|
|
|
@pytest.mark.parametrize("density", [0.01, 0.1, 0.30]) |
|
@pytest.mark.parametrize("n_components", [1, 2, 10]) |
|
@pytest.mark.parametrize("sparse_container", CSR_CONTAINERS + CSC_CONTAINERS) |
|
@pytest.mark.parametrize("svd_solver", ["arpack", "covariance_eigh"]) |
|
@pytest.mark.parametrize("scale", [1, 10, 100]) |
|
def test_pca_sparse( |
|
global_random_seed, svd_solver, sparse_container, n_components, density, scale |
|
): |
|
"""Check that the results are the same for sparse and dense input.""" |
|
|
|
|
|
|
|
atol = 1e-12 |
|
transform_atol = 1e-10 |
|
|
|
random_state = np.random.default_rng(global_random_seed) |
|
X = sparse_container( |
|
sp.sparse.random( |
|
SPARSE_M, |
|
SPARSE_N, |
|
random_state=random_state, |
|
density=density, |
|
) |
|
) |
|
|
|
scale_vector = random_state.random(X.shape[1]) * scale |
|
X = X.multiply(scale_vector) |
|
|
|
pca = PCA( |
|
n_components=n_components, |
|
svd_solver=svd_solver, |
|
random_state=global_random_seed, |
|
) |
|
pca.fit(X) |
|
|
|
Xd = X.toarray() |
|
pcad = PCA( |
|
n_components=n_components, |
|
svd_solver=svd_solver, |
|
random_state=global_random_seed, |
|
) |
|
pcad.fit(Xd) |
|
|
|
|
|
_check_fitted_pca_close(pca, pcad, atol=atol) |
|
|
|
|
|
X2 = sparse_container( |
|
sp.sparse.random( |
|
SPARSE_M, |
|
SPARSE_N, |
|
random_state=random_state, |
|
density=density, |
|
) |
|
) |
|
X2d = X2.toarray() |
|
|
|
assert_allclose(pca.transform(X2), pca.transform(X2d), atol=transform_atol) |
|
assert_allclose(pca.transform(X2), pcad.transform(X2d), atol=transform_atol) |
|
|
|
|
|
@pytest.mark.parametrize("sparse_container", CSR_CONTAINERS + CSC_CONTAINERS) |
|
def test_pca_sparse_fit_transform(global_random_seed, sparse_container): |
|
random_state = np.random.default_rng(global_random_seed) |
|
X = sparse_container( |
|
sp.sparse.random( |
|
SPARSE_M, |
|
SPARSE_N, |
|
random_state=random_state, |
|
density=0.01, |
|
) |
|
) |
|
X2 = sparse_container( |
|
sp.sparse.random( |
|
SPARSE_M, |
|
SPARSE_N, |
|
random_state=random_state, |
|
density=0.01, |
|
) |
|
) |
|
|
|
pca_fit = PCA(n_components=10, svd_solver="arpack", random_state=global_random_seed) |
|
pca_fit_transform = PCA( |
|
n_components=10, svd_solver="arpack", random_state=global_random_seed |
|
) |
|
|
|
pca_fit.fit(X) |
|
transformed_X = pca_fit_transform.fit_transform(X) |
|
|
|
_check_fitted_pca_close(pca_fit, pca_fit_transform) |
|
assert_allclose(transformed_X, pca_fit_transform.transform(X)) |
|
assert_allclose(transformed_X, pca_fit.transform(X)) |
|
assert_allclose(pca_fit.transform(X2), pca_fit_transform.transform(X2)) |
|
|
|
|
|
@pytest.mark.parametrize("svd_solver", ["randomized", "full"]) |
|
@pytest.mark.parametrize("sparse_container", CSR_CONTAINERS + CSC_CONTAINERS) |
|
def test_sparse_pca_solver_error(global_random_seed, svd_solver, sparse_container): |
|
random_state = np.random.RandomState(global_random_seed) |
|
X = sparse_container( |
|
sp.sparse.random( |
|
SPARSE_M, |
|
SPARSE_N, |
|
random_state=random_state, |
|
) |
|
) |
|
pca = PCA(n_components=30, svd_solver=svd_solver) |
|
error_msg_pattern = ( |
|
'PCA only support sparse inputs with the "arpack" and "covariance_eigh"' |
|
f' solvers, while "{svd_solver}" was passed' |
|
) |
|
with pytest.raises(TypeError, match=error_msg_pattern): |
|
pca.fit(X) |
|
|
|
|
|
@pytest.mark.parametrize("sparse_container", CSR_CONTAINERS + CSC_CONTAINERS) |
|
def test_sparse_pca_auto_arpack_singluar_values_consistency( |
|
global_random_seed, sparse_container |
|
): |
|
"""Check that "auto" and "arpack" solvers are equivalent for sparse inputs.""" |
|
random_state = np.random.RandomState(global_random_seed) |
|
X = sparse_container( |
|
sp.sparse.random( |
|
SPARSE_M, |
|
SPARSE_N, |
|
random_state=random_state, |
|
) |
|
) |
|
pca_arpack = PCA(n_components=10, svd_solver="arpack").fit(X) |
|
pca_auto = PCA(n_components=10, svd_solver="auto").fit(X) |
|
assert_allclose(pca_arpack.singular_values_, pca_auto.singular_values_, rtol=5e-3) |
|
|
|
|
|
def test_no_empty_slice_warning(): |
|
|
|
n_components = 10 |
|
n_features = n_components + 2 |
|
X = np.random.uniform(-1, 1, size=(n_components, n_features)) |
|
pca = PCA(n_components=n_components) |
|
with warnings.catch_warnings(): |
|
warnings.simplefilter("error", RuntimeWarning) |
|
pca.fit(X) |
|
|
|
|
|
@pytest.mark.parametrize("copy", [True, False]) |
|
@pytest.mark.parametrize("solver", PCA_SOLVERS) |
|
def test_whitening(solver, copy): |
|
|
|
rng = np.random.RandomState(0) |
|
n_samples = 100 |
|
n_features = 80 |
|
n_components = 30 |
|
rank = 50 |
|
|
|
|
|
X = np.dot( |
|
rng.randn(n_samples, rank), |
|
np.dot(np.diag(np.linspace(10.0, 1.0, rank)), rng.randn(rank, n_features)), |
|
) |
|
|
|
|
|
X[:, :50] *= 3 |
|
|
|
assert X.shape == (n_samples, n_features) |
|
|
|
|
|
assert X.std(axis=0).std() > 43.8 |
|
|
|
|
|
X_ = X.copy() |
|
pca = PCA( |
|
n_components=n_components, |
|
whiten=True, |
|
copy=copy, |
|
svd_solver=solver, |
|
random_state=0, |
|
iterated_power=7, |
|
) |
|
|
|
X_whitened = pca.fit_transform(X_.copy()) |
|
assert X_whitened.shape == (n_samples, n_components) |
|
X_whitened2 = pca.transform(X_) |
|
assert_allclose(X_whitened, X_whitened2, rtol=5e-4) |
|
|
|
assert_allclose(X_whitened.std(ddof=1, axis=0), np.ones(n_components)) |
|
assert_allclose(X_whitened.mean(axis=0), np.zeros(n_components), atol=1e-12) |
|
|
|
X_ = X.copy() |
|
pca = PCA( |
|
n_components=n_components, whiten=False, copy=copy, svd_solver=solver |
|
).fit(X_.copy()) |
|
X_unwhitened = pca.transform(X_) |
|
assert X_unwhitened.shape == (n_samples, n_components) |
|
|
|
|
|
assert X_unwhitened.std(axis=0).std() == pytest.approx(74.1, rel=1e-1) |
|
|
|
|
|
|
|
@pytest.mark.parametrize( |
|
"other_svd_solver", sorted(list(set(PCA_SOLVERS) - {"full", "auto"})) |
|
) |
|
@pytest.mark.parametrize("data_shape", ["tall", "wide"]) |
|
@pytest.mark.parametrize("rank_deficient", [False, True]) |
|
@pytest.mark.parametrize("whiten", [False, True]) |
|
def test_pca_solver_equivalence( |
|
other_svd_solver, |
|
data_shape, |
|
rank_deficient, |
|
whiten, |
|
global_random_seed, |
|
global_dtype, |
|
): |
|
if data_shape == "tall": |
|
n_samples, n_features = 100, 30 |
|
else: |
|
n_samples, n_features = 30, 100 |
|
n_samples_test = 10 |
|
|
|
if rank_deficient: |
|
rng = np.random.default_rng(global_random_seed) |
|
rank = min(n_samples, n_features) // 2 |
|
X = rng.standard_normal( |
|
size=(n_samples + n_samples_test, rank) |
|
) @ rng.standard_normal(size=(rank, n_features)) |
|
else: |
|
X = make_low_rank_matrix( |
|
n_samples=n_samples + n_samples_test, |
|
n_features=n_features, |
|
tail_strength=0.5, |
|
random_state=global_random_seed, |
|
) |
|
|
|
rank = min(n_samples, n_features) |
|
|
|
X = X.astype(global_dtype, copy=False) |
|
X_train, X_test = X[:n_samples], X[n_samples:] |
|
|
|
if global_dtype == np.float32: |
|
tols = dict(atol=3e-2, rtol=1e-5) |
|
variance_threshold = 1e-5 |
|
else: |
|
tols = dict(atol=1e-10, rtol=1e-12) |
|
variance_threshold = 1e-12 |
|
|
|
extra_other_kwargs = {} |
|
if other_svd_solver == "randomized": |
|
|
|
|
|
n_components = 10 |
|
extra_other_kwargs = {"iterated_power": 50} |
|
elif other_svd_solver == "arpack": |
|
|
|
|
|
n_components = np.minimum(n_samples, n_features) - 1 |
|
else: |
|
|
|
n_components = None |
|
|
|
pca_full = PCA(n_components=n_components, svd_solver="full", whiten=whiten) |
|
pca_other = PCA( |
|
n_components=n_components, |
|
svd_solver=other_svd_solver, |
|
whiten=whiten, |
|
random_state=global_random_seed, |
|
**extra_other_kwargs, |
|
) |
|
X_trans_full_train = pca_full.fit_transform(X_train) |
|
assert np.isfinite(X_trans_full_train).all() |
|
assert X_trans_full_train.dtype == global_dtype |
|
X_trans_other_train = pca_other.fit_transform(X_train) |
|
assert np.isfinite(X_trans_other_train).all() |
|
assert X_trans_other_train.dtype == global_dtype |
|
|
|
assert (pca_full.explained_variance_ >= 0).all() |
|
assert_allclose(pca_full.explained_variance_, pca_other.explained_variance_, **tols) |
|
assert_allclose( |
|
pca_full.explained_variance_ratio_, |
|
pca_other.explained_variance_ratio_, |
|
**tols, |
|
) |
|
reference_components = pca_full.components_ |
|
assert np.isfinite(reference_components).all() |
|
other_components = pca_other.components_ |
|
assert np.isfinite(other_components).all() |
|
|
|
|
|
|
|
stable = pca_full.explained_variance_ > variance_threshold |
|
assert stable.sum() > 1 |
|
assert_allclose(reference_components[stable], other_components[stable], **tols) |
|
|
|
|
|
assert_allclose( |
|
X_trans_other_train[:, stable], X_trans_full_train[:, stable], **tols |
|
) |
|
|
|
|
|
|
|
X_trans_full_test = pca_full.transform(X_test) |
|
assert np.isfinite(X_trans_full_test).all() |
|
assert X_trans_full_test.dtype == global_dtype |
|
X_trans_other_test = pca_other.transform(X_test) |
|
assert np.isfinite(X_trans_other_test).all() |
|
assert X_trans_other_test.dtype == global_dtype |
|
assert_allclose(X_trans_other_test[:, stable], X_trans_full_test[:, stable], **tols) |
|
|
|
|
|
|
|
X_recons_full_test = pca_full.inverse_transform(X_trans_full_test) |
|
assert np.isfinite(X_recons_full_test).all() |
|
assert X_recons_full_test.dtype == global_dtype |
|
X_recons_other_test = pca_other.inverse_transform(X_trans_other_test) |
|
assert np.isfinite(X_recons_other_test).all() |
|
assert X_recons_other_test.dtype == global_dtype |
|
|
|
if pca_full.components_.shape[0] == pca_full.components_.shape[1]: |
|
|
|
|
|
|
|
assert_allclose(X_recons_full_test, X_test, **tols) |
|
assert_allclose(X_recons_other_test, X_test, **tols) |
|
elif pca_full.components_.shape[0] < rank: |
|
|
|
|
|
assert pca_full.explained_variance_.min() > variance_threshold |
|
assert_allclose(X_recons_full_test, X_recons_other_test, **tols) |
|
else: |
|
|
|
|
|
|
|
|
|
assert_allclose( |
|
pca_full.transform(X_recons_full_test)[:, stable], |
|
pca_other.transform(X_recons_other_test)[:, stable], |
|
**tols, |
|
) |
|
|
|
|
|
@pytest.mark.parametrize( |
|
"X", |
|
[ |
|
np.random.RandomState(0).randn(100, 80), |
|
datasets.make_classification(100, 80, n_informative=78, random_state=0)[0], |
|
np.random.RandomState(0).randn(10, 100), |
|
], |
|
ids=["random-tall", "correlated-tall", "random-wide"], |
|
) |
|
@pytest.mark.parametrize("svd_solver", PCA_SOLVERS) |
|
def test_pca_explained_variance_empirical(X, svd_solver): |
|
pca = PCA(n_components=2, svd_solver=svd_solver, random_state=0) |
|
X_pca = pca.fit_transform(X) |
|
assert_allclose(pca.explained_variance_, np.var(X_pca, ddof=1, axis=0)) |
|
|
|
expected_result = np.linalg.eig(np.cov(X, rowvar=False))[0] |
|
expected_result = sorted(expected_result, reverse=True)[:2] |
|
assert_allclose(pca.explained_variance_, expected_result, rtol=5e-3) |
|
|
|
|
|
@pytest.mark.parametrize("svd_solver", ["arpack", "randomized"]) |
|
def test_pca_singular_values_consistency(svd_solver): |
|
rng = np.random.RandomState(0) |
|
n_samples, n_features = 100, 80 |
|
X = rng.randn(n_samples, n_features) |
|
|
|
pca_full = PCA(n_components=2, svd_solver="full", random_state=rng) |
|
pca_other = PCA(n_components=2, svd_solver=svd_solver, random_state=rng) |
|
|
|
pca_full.fit(X) |
|
pca_other.fit(X) |
|
|
|
assert_allclose(pca_full.singular_values_, pca_other.singular_values_, rtol=5e-3) |
|
|
|
|
|
@pytest.mark.parametrize("svd_solver", PCA_SOLVERS) |
|
def test_pca_singular_values(svd_solver): |
|
rng = np.random.RandomState(0) |
|
n_samples, n_features = 100, 80 |
|
X = rng.randn(n_samples, n_features) |
|
|
|
pca = PCA(n_components=2, svd_solver=svd_solver, random_state=rng) |
|
X_trans = pca.fit_transform(X) |
|
|
|
|
|
assert_allclose( |
|
np.sum(pca.singular_values_**2), np.linalg.norm(X_trans, "fro") ** 2 |
|
) |
|
|
|
assert_allclose(pca.singular_values_, np.sqrt(np.sum(X_trans**2, axis=0))) |
|
|
|
|
|
n_samples, n_features = 100, 110 |
|
X = rng.randn(n_samples, n_features) |
|
|
|
pca = PCA(n_components=3, svd_solver=svd_solver, random_state=rng) |
|
X_trans = pca.fit_transform(X) |
|
X_trans /= np.sqrt(np.sum(X_trans**2, axis=0)) |
|
X_trans[:, 0] *= 3.142 |
|
X_trans[:, 1] *= 2.718 |
|
X_hat = np.dot(X_trans, pca.components_) |
|
pca.fit(X_hat) |
|
assert_allclose(pca.singular_values_, [3.142, 2.718, 1.0]) |
|
|
|
|
|
@pytest.mark.parametrize("svd_solver", PCA_SOLVERS) |
|
def test_pca_check_projection(svd_solver): |
|
|
|
rng = np.random.RandomState(0) |
|
n, p = 100, 3 |
|
X = rng.randn(n, p) * 0.1 |
|
X[:10] += np.array([3, 4, 5]) |
|
Xt = 0.1 * rng.randn(1, p) + np.array([3, 4, 5]) |
|
|
|
Yt = PCA(n_components=2, svd_solver=svd_solver).fit(X).transform(Xt) |
|
Yt /= np.sqrt((Yt**2).sum()) |
|
|
|
assert_allclose(np.abs(Yt[0][0]), 1.0, rtol=5e-3) |
|
|
|
|
|
@pytest.mark.parametrize("svd_solver", PCA_SOLVERS) |
|
def test_pca_check_projection_list(svd_solver): |
|
|
|
X = [[1.0, 0.0], [0.0, 1.0]] |
|
pca = PCA(n_components=1, svd_solver=svd_solver, random_state=0) |
|
X_trans = pca.fit_transform(X) |
|
assert X_trans.shape, (2, 1) |
|
assert_allclose(X_trans.mean(), 0.00, atol=1e-12) |
|
assert_allclose(X_trans.std(), 0.71, rtol=5e-3) |
|
|
|
|
|
@pytest.mark.parametrize("svd_solver", ["full", "arpack", "randomized"]) |
|
@pytest.mark.parametrize("whiten", [False, True]) |
|
def test_pca_inverse(svd_solver, whiten): |
|
|
|
rng = np.random.RandomState(0) |
|
n, p = 50, 3 |
|
X = rng.randn(n, p) |
|
X[:, 1] *= 0.00001 |
|
X += [5, 4, 3] |
|
|
|
|
|
|
|
pca = PCA(n_components=2, svd_solver=svd_solver, whiten=whiten).fit(X) |
|
Y = pca.transform(X) |
|
Y_inverse = pca.inverse_transform(Y) |
|
assert_allclose(X, Y_inverse, rtol=5e-6) |
|
|
|
|
|
@pytest.mark.parametrize( |
|
"data", [np.array([[0, 1, 0], [1, 0, 0]]), np.array([[0, 1, 0], [1, 0, 0]]).T] |
|
) |
|
@pytest.mark.parametrize( |
|
"svd_solver, n_components, err_msg", |
|
[ |
|
("arpack", 0, r"must be between 1 and min\(n_samples, n_features\)"), |
|
("randomized", 0, r"must be between 1 and min\(n_samples, n_features\)"), |
|
("arpack", 2, r"must be strictly less than min"), |
|
( |
|
"auto", |
|
3, |
|
( |
|
r"n_components=3 must be between 0 and min\(n_samples, " |
|
r"n_features\)=2 with svd_solver='full'" |
|
), |
|
), |
|
], |
|
) |
|
def test_pca_validation(svd_solver, data, n_components, err_msg): |
|
|
|
|
|
smallest_d = 2 |
|
pca_fitted = PCA(n_components, svd_solver=svd_solver) |
|
|
|
with pytest.raises(ValueError, match=err_msg): |
|
pca_fitted.fit(data) |
|
|
|
|
|
if svd_solver == "arpack": |
|
n_components = smallest_d |
|
|
|
err_msg = ( |
|
"n_components={}L? must be strictly less than " |
|
r"min\(n_samples, n_features\)={}L? with " |
|
"svd_solver='arpack'".format(n_components, smallest_d) |
|
) |
|
with pytest.raises(ValueError, match=err_msg): |
|
PCA(n_components, svd_solver=svd_solver).fit(data) |
|
|
|
|
|
@pytest.mark.parametrize( |
|
"solver, n_components_", |
|
[ |
|
("full", min(iris.data.shape)), |
|
("arpack", min(iris.data.shape) - 1), |
|
("randomized", min(iris.data.shape)), |
|
], |
|
) |
|
@pytest.mark.parametrize("data", [iris.data, iris.data.T]) |
|
def test_n_components_none(data, solver, n_components_): |
|
pca = PCA(svd_solver=solver) |
|
pca.fit(data) |
|
assert pca.n_components_ == n_components_ |
|
|
|
|
|
@pytest.mark.parametrize("svd_solver", ["auto", "full"]) |
|
def test_n_components_mle(svd_solver): |
|
|
|
rng = np.random.RandomState(0) |
|
n_samples, n_features = 600, 10 |
|
X = rng.randn(n_samples, n_features) |
|
pca = PCA(n_components="mle", svd_solver=svd_solver) |
|
pca.fit(X) |
|
assert pca.n_components_ == 1 |
|
|
|
|
|
@pytest.mark.parametrize("svd_solver", ["arpack", "randomized"]) |
|
def test_n_components_mle_error(svd_solver): |
|
|
|
|
|
rng = np.random.RandomState(0) |
|
n_samples, n_features = 600, 10 |
|
X = rng.randn(n_samples, n_features) |
|
pca = PCA(n_components="mle", svd_solver=svd_solver) |
|
err_msg = "n_components='mle' cannot be a string with svd_solver='{}'".format( |
|
svd_solver |
|
) |
|
with pytest.raises(ValueError, match=err_msg): |
|
pca.fit(X) |
|
|
|
|
|
def test_pca_dim(): |
|
|
|
rng = np.random.RandomState(0) |
|
n, p = 100, 5 |
|
X = rng.randn(n, p) * 0.1 |
|
X[:10] += np.array([3, 4, 5, 1, 2]) |
|
pca = PCA(n_components="mle", svd_solver="full").fit(X) |
|
assert pca.n_components == "mle" |
|
assert pca.n_components_ == 1 |
|
|
|
|
|
def test_infer_dim_1(): |
|
|
|
|
|
n, p = 1000, 5 |
|
rng = np.random.RandomState(0) |
|
X = ( |
|
rng.randn(n, p) * 0.1 |
|
+ rng.randn(n, 1) * np.array([3, 4, 5, 1, 2]) |
|
+ np.array([1, 0, 7, 4, 6]) |
|
) |
|
pca = PCA(n_components=p, svd_solver="full") |
|
pca.fit(X) |
|
spect = pca.explained_variance_ |
|
ll = np.array([_assess_dimension(spect, k, n) for k in range(1, p)]) |
|
assert ll[1] > ll.max() - 0.01 * n |
|
|
|
|
|
def test_infer_dim_2(): |
|
|
|
|
|
n, p = 1000, 5 |
|
rng = np.random.RandomState(0) |
|
X = rng.randn(n, p) * 0.1 |
|
X[:10] += np.array([3, 4, 5, 1, 2]) |
|
X[10:20] += np.array([6, 0, 7, 2, -1]) |
|
pca = PCA(n_components=p, svd_solver="full") |
|
pca.fit(X) |
|
spect = pca.explained_variance_ |
|
assert _infer_dimension(spect, n) > 1 |
|
|
|
|
|
def test_infer_dim_3(): |
|
n, p = 100, 5 |
|
rng = np.random.RandomState(0) |
|
X = rng.randn(n, p) * 0.1 |
|
X[:10] += np.array([3, 4, 5, 1, 2]) |
|
X[10:20] += np.array([6, 0, 7, 2, -1]) |
|
X[30:40] += 2 * np.array([-1, 1, -1, 1, -1]) |
|
pca = PCA(n_components=p, svd_solver="full") |
|
pca.fit(X) |
|
spect = pca.explained_variance_ |
|
assert _infer_dimension(spect, n) > 2 |
|
|
|
|
|
@pytest.mark.parametrize( |
|
"X, n_components, n_components_validated", |
|
[ |
|
(iris.data, 0.95, 2), |
|
(iris.data, 0.01, 1), |
|
(np.random.RandomState(0).rand(5, 20), 0.5, 2), |
|
], |
|
) |
|
def test_infer_dim_by_explained_variance(X, n_components, n_components_validated): |
|
pca = PCA(n_components=n_components, svd_solver="full") |
|
pca.fit(X) |
|
assert pca.n_components == pytest.approx(n_components) |
|
assert pca.n_components_ == n_components_validated |
|
|
|
|
|
@pytest.mark.parametrize("svd_solver", PCA_SOLVERS) |
|
def test_pca_score(svd_solver): |
|
|
|
n, p = 1000, 3 |
|
rng = np.random.RandomState(0) |
|
X = rng.randn(n, p) * 0.1 + np.array([3, 4, 5]) |
|
pca = PCA(n_components=2, svd_solver=svd_solver) |
|
pca.fit(X) |
|
|
|
ll1 = pca.score(X) |
|
h = -0.5 * np.log(2 * np.pi * np.exp(1) * 0.1**2) * p |
|
assert_allclose(ll1 / h, 1, rtol=5e-2) |
|
|
|
ll2 = pca.score(rng.randn(n, p) * 0.2 + np.array([3, 4, 5])) |
|
assert ll1 > ll2 |
|
|
|
pca = PCA(n_components=2, whiten=True, svd_solver=svd_solver) |
|
pca.fit(X) |
|
ll2 = pca.score(X) |
|
assert ll1 > ll2 |
|
|
|
|
|
def test_pca_score3(): |
|
|
|
n, p = 200, 3 |
|
rng = np.random.RandomState(0) |
|
Xl = rng.randn(n, p) + rng.randn(n, 1) * np.array([3, 4, 5]) + np.array([1, 0, 7]) |
|
Xt = rng.randn(n, p) + rng.randn(n, 1) * np.array([3, 4, 5]) + np.array([1, 0, 7]) |
|
ll = np.zeros(p) |
|
for k in range(p): |
|
pca = PCA(n_components=k, svd_solver="full") |
|
pca.fit(Xl) |
|
ll[k] = pca.score(Xt) |
|
|
|
assert ll.argmax() == 1 |
|
|
|
|
|
@pytest.mark.parametrize("svd_solver", PCA_SOLVERS) |
|
def test_pca_sanity_noise_variance(svd_solver): |
|
|
|
|
|
|
|
|
|
X, _ = datasets.load_digits(return_X_y=True) |
|
pca = PCA(n_components=30, svd_solver=svd_solver, random_state=0) |
|
pca.fit(X) |
|
assert np.all((pca.explained_variance_ - pca.noise_variance_) >= 0) |
|
|
|
|
|
@pytest.mark.parametrize("svd_solver", ["arpack", "randomized"]) |
|
def test_pca_score_consistency_solvers(svd_solver): |
|
|
|
X, _ = datasets.load_digits(return_X_y=True) |
|
pca_full = PCA(n_components=30, svd_solver="full", random_state=0) |
|
pca_other = PCA(n_components=30, svd_solver=svd_solver, random_state=0) |
|
pca_full.fit(X) |
|
pca_other.fit(X) |
|
assert_allclose(pca_full.score(X), pca_other.score(X), rtol=5e-6) |
|
|
|
|
|
|
|
@pytest.mark.parametrize("svd_solver", ["full", "randomized"]) |
|
def test_pca_zero_noise_variance_edge_cases(svd_solver): |
|
|
|
|
|
n, p = 100, 3 |
|
rng = np.random.RandomState(0) |
|
X = rng.randn(n, p) * 0.1 + np.array([3, 4, 5]) |
|
|
|
pca = PCA(n_components=p, svd_solver=svd_solver) |
|
pca.fit(X) |
|
assert pca.noise_variance_ == 0 |
|
|
|
|
|
pca.score(X) |
|
|
|
pca.fit(X.T) |
|
assert pca.noise_variance_ == 0 |
|
|
|
|
|
pca.score(X.T) |
|
|
|
|
|
@pytest.mark.parametrize( |
|
"n_samples, n_features, n_components, expected_solver", |
|
[ |
|
|
|
(10, 50, 5, "full"), |
|
|
|
(1000, 50, 50, "covariance_eigh"), |
|
|
|
(1000, 500, 400, "full"), |
|
|
|
(1000, 500, 10, "randomized"), |
|
|
|
(1000, 500, 0.5, "full"), |
|
], |
|
) |
|
def test_pca_svd_solver_auto(n_samples, n_features, n_components, expected_solver): |
|
data = np.random.RandomState(0).uniform(size=(n_samples, n_features)) |
|
pca_auto = PCA(n_components=n_components, random_state=0) |
|
pca_test = PCA( |
|
n_components=n_components, svd_solver=expected_solver, random_state=0 |
|
) |
|
pca_auto.fit(data) |
|
assert pca_auto._fit_svd_solver == expected_solver |
|
pca_test.fit(data) |
|
assert_allclose(pca_auto.components_, pca_test.components_) |
|
|
|
|
|
@pytest.mark.parametrize("svd_solver", PCA_SOLVERS) |
|
def test_pca_deterministic_output(svd_solver): |
|
rng = np.random.RandomState(0) |
|
X = rng.rand(10, 10) |
|
|
|
transformed_X = np.zeros((20, 2)) |
|
for i in range(20): |
|
pca = PCA(n_components=2, svd_solver=svd_solver, random_state=rng) |
|
transformed_X[i, :] = pca.fit_transform(X)[0] |
|
assert_allclose(transformed_X, np.tile(transformed_X[0, :], 20).reshape(20, 2)) |
|
|
|
|
|
@pytest.mark.parametrize("svd_solver", PCA_SOLVERS) |
|
def test_pca_dtype_preservation(svd_solver, global_random_seed): |
|
check_pca_float_dtype_preservation(svd_solver, global_random_seed) |
|
check_pca_int_dtype_upcast_to_double(svd_solver) |
|
|
|
|
|
def check_pca_float_dtype_preservation(svd_solver, seed): |
|
|
|
X = np.random.RandomState(seed).rand(1000, 4) |
|
X_float64 = X.astype(np.float64, copy=False) |
|
X_float32 = X.astype(np.float32) |
|
|
|
pca_64 = PCA(n_components=3, svd_solver=svd_solver, random_state=seed).fit( |
|
X_float64 |
|
) |
|
pca_32 = PCA(n_components=3, svd_solver=svd_solver, random_state=seed).fit( |
|
X_float32 |
|
) |
|
|
|
assert pca_64.components_.dtype == np.float64 |
|
assert pca_32.components_.dtype == np.float32 |
|
assert pca_64.transform(X_float64).dtype == np.float64 |
|
assert pca_32.transform(X_float32).dtype == np.float32 |
|
|
|
|
|
|
|
|
|
assert_allclose(pca_64.components_, pca_32.components_, rtol=1e-3, atol=1e-3) |
|
|
|
|
|
def check_pca_int_dtype_upcast_to_double(svd_solver): |
|
|
|
X_i64 = np.random.RandomState(0).randint(0, 1000, (1000, 4)) |
|
X_i64 = X_i64.astype(np.int64, copy=False) |
|
X_i32 = X_i64.astype(np.int32, copy=False) |
|
|
|
pca_64 = PCA(n_components=3, svd_solver=svd_solver, random_state=0).fit(X_i64) |
|
pca_32 = PCA(n_components=3, svd_solver=svd_solver, random_state=0).fit(X_i32) |
|
|
|
assert pca_64.components_.dtype == np.float64 |
|
assert pca_32.components_.dtype == np.float64 |
|
assert pca_64.transform(X_i64).dtype == np.float64 |
|
assert pca_32.transform(X_i32).dtype == np.float64 |
|
|
|
assert_allclose(pca_64.components_, pca_32.components_, rtol=1e-4) |
|
|
|
|
|
def test_pca_n_components_mostly_explained_variance_ratio(): |
|
|
|
|
|
|
|
X, y = load_iris(return_X_y=True) |
|
pca1 = PCA().fit(X, y) |
|
|
|
n_components = pca1.explained_variance_ratio_.cumsum()[-2] |
|
pca2 = PCA(n_components=n_components).fit(X, y) |
|
assert pca2.n_components_ == X.shape[1] |
|
|
|
|
|
def test_assess_dimension_bad_rank(): |
|
|
|
spectrum = np.array([1, 1e-30, 1e-30, 1e-30]) |
|
n_samples = 10 |
|
for rank in (0, 5): |
|
with pytest.raises(ValueError, match=r"should be in \[1, n_features - 1\]"): |
|
_assess_dimension(spectrum, rank, n_samples) |
|
|
|
|
|
def test_small_eigenvalues_mle(): |
|
|
|
|
|
spectrum = np.array([1, 1e-30, 1e-30, 1e-30]) |
|
|
|
assert _assess_dimension(spectrum, rank=1, n_samples=10) > -np.inf |
|
|
|
for rank in (2, 3): |
|
assert _assess_dimension(spectrum, rank, 10) == -np.inf |
|
|
|
assert _infer_dimension(spectrum, 10) == 1 |
|
|
|
|
|
def test_mle_redundant_data(): |
|
|
|
|
|
X, _ = datasets.make_classification( |
|
n_features=20, |
|
n_informative=1, |
|
n_repeated=18, |
|
n_redundant=1, |
|
n_clusters_per_class=1, |
|
random_state=42, |
|
) |
|
pca = PCA(n_components="mle").fit(X) |
|
assert pca.n_components_ == 1 |
|
|
|
|
|
def test_fit_mle_too_few_samples(): |
|
|
|
|
|
X, _ = datasets.make_classification(n_samples=20, n_features=21, random_state=42) |
|
|
|
pca = PCA(n_components="mle", svd_solver="full") |
|
with pytest.raises( |
|
ValueError, |
|
match="n_components='mle' is only supported if n_samples >= n_features", |
|
): |
|
pca.fit(X) |
|
|
|
|
|
def test_mle_simple_case(): |
|
|
|
|
|
n_samples, n_dim = 1000, 10 |
|
X = np.random.RandomState(0).randn(n_samples, n_dim) |
|
X[:, -1] = np.mean(X[:, :-1], axis=-1) |
|
pca_skl = PCA("mle", svd_solver="full") |
|
pca_skl.fit(X) |
|
assert pca_skl.n_components_ == n_dim - 1 |
|
|
|
|
|
def test_assess_dimesion_rank_one(): |
|
|
|
n_samples, n_features = 9, 6 |
|
X = np.ones((n_samples, n_features)) |
|
_, s, _ = np.linalg.svd(X, full_matrices=True) |
|
|
|
assert_allclose(s[1:], np.zeros(n_features - 1), atol=1e-12) |
|
|
|
assert np.isfinite(_assess_dimension(s, rank=1, n_samples=n_samples)) |
|
for rank in range(2, n_features): |
|
assert _assess_dimension(s, rank, n_samples) == -np.inf |
|
|
|
|
|
def test_pca_randomized_svd_n_oversamples(): |
|
"""Check that exposing and setting `n_oversamples` will provide accurate results |
|
even when `X` as a large number of features. |
|
|
|
Non-regression test for: |
|
https://github.com/scikit-learn/scikit-learn/issues/20589 |
|
""" |
|
rng = np.random.RandomState(0) |
|
n_features = 100 |
|
X = rng.randn(1_000, n_features) |
|
|
|
|
|
|
|
pca_randomized = PCA( |
|
n_components=1, |
|
svd_solver="randomized", |
|
n_oversamples=n_features, |
|
random_state=0, |
|
).fit(X) |
|
pca_full = PCA(n_components=1, svd_solver="full").fit(X) |
|
pca_arpack = PCA(n_components=1, svd_solver="arpack", random_state=0).fit(X) |
|
|
|
assert_allclose(np.abs(pca_full.components_), np.abs(pca_arpack.components_)) |
|
assert_allclose(np.abs(pca_randomized.components_), np.abs(pca_arpack.components_)) |
|
|
|
|
|
def test_feature_names_out(): |
|
"""Check feature names out for PCA.""" |
|
pca = PCA(n_components=2).fit(iris.data) |
|
|
|
names = pca.get_feature_names_out() |
|
assert_array_equal([f"pca{i}" for i in range(2)], names) |
|
|
|
|
|
@pytest.mark.parametrize("copy", [True, False]) |
|
def test_variance_correctness(copy): |
|
"""Check the accuracy of PCA's internal variance calculation""" |
|
rng = np.random.RandomState(0) |
|
X = rng.randn(1000, 200) |
|
pca = PCA().fit(X) |
|
pca_var = pca.explained_variance_ / pca.explained_variance_ratio_ |
|
true_var = np.var(X, ddof=1, axis=0).sum() |
|
np.testing.assert_allclose(pca_var, true_var) |
|
|
|
|
|
def check_array_api_get_precision(name, estimator, array_namespace, device, dtype_name): |
|
xp = _array_api_for_tests(array_namespace, device) |
|
iris_np = iris.data.astype(dtype_name) |
|
iris_xp = xp.asarray(iris_np, device=device) |
|
|
|
estimator.fit(iris_np) |
|
precision_np = estimator.get_precision() |
|
covariance_np = estimator.get_covariance() |
|
|
|
rtol = 2e-4 if iris_np.dtype == "float32" else 2e-7 |
|
with config_context(array_api_dispatch=True): |
|
estimator_xp = clone(estimator).fit(iris_xp) |
|
precision_xp = estimator_xp.get_precision() |
|
assert precision_xp.shape == (4, 4) |
|
assert precision_xp.dtype == iris_xp.dtype |
|
|
|
assert_allclose( |
|
_convert_to_numpy(precision_xp, xp=xp), |
|
precision_np, |
|
rtol=rtol, |
|
atol=_atol_for_type(dtype_name), |
|
) |
|
covariance_xp = estimator_xp.get_covariance() |
|
assert covariance_xp.shape == (4, 4) |
|
assert covariance_xp.dtype == iris_xp.dtype |
|
|
|
assert_allclose( |
|
_convert_to_numpy(covariance_xp, xp=xp), |
|
covariance_np, |
|
rtol=rtol, |
|
atol=_atol_for_type(dtype_name), |
|
) |
|
|
|
|
|
@pytest.mark.parametrize( |
|
"array_namespace, device, dtype_name", yield_namespace_device_dtype_combinations() |
|
) |
|
@pytest.mark.parametrize( |
|
"check", |
|
[check_array_api_input_and_values, check_array_api_get_precision], |
|
ids=_get_check_estimator_ids, |
|
) |
|
@pytest.mark.parametrize( |
|
"estimator", |
|
[ |
|
PCA(n_components=2, svd_solver="full"), |
|
PCA(n_components=2, svd_solver="full", whiten=True), |
|
PCA(n_components=0.1, svd_solver="full", whiten=True), |
|
PCA(n_components=2, svd_solver="covariance_eigh"), |
|
PCA(n_components=2, svd_solver="covariance_eigh", whiten=True), |
|
PCA( |
|
n_components=2, |
|
svd_solver="randomized", |
|
power_iteration_normalizer="QR", |
|
random_state=0, |
|
), |
|
], |
|
ids=_get_check_estimator_ids, |
|
) |
|
def test_pca_array_api_compliance( |
|
estimator, check, array_namespace, device, dtype_name |
|
): |
|
name = estimator.__class__.__name__ |
|
check(name, estimator, array_namespace, device=device, dtype_name=dtype_name) |
|
|
|
|
|
@pytest.mark.parametrize( |
|
"array_namespace, device, dtype_name", yield_namespace_device_dtype_combinations() |
|
) |
|
@pytest.mark.parametrize( |
|
"check", |
|
[check_array_api_get_precision], |
|
ids=_get_check_estimator_ids, |
|
) |
|
@pytest.mark.parametrize( |
|
"estimator", |
|
[ |
|
|
|
|
|
|
|
|
|
|
|
PCA(n_components="mle", svd_solver="full"), |
|
], |
|
ids=_get_check_estimator_ids, |
|
) |
|
def test_pca_mle_array_api_compliance( |
|
estimator, check, array_namespace, device, dtype_name |
|
): |
|
name = estimator.__class__.__name__ |
|
check(name, estimator, array_namespace, device=device, dtype_name=dtype_name) |
|
|
|
|
|
|
|
xp = _array_api_for_tests(array_namespace, device) |
|
|
|
X, y = make_classification(random_state=42) |
|
X = X.astype(dtype_name, copy=False) |
|
atol = _atol_for_type(X.dtype) |
|
|
|
est = clone(estimator) |
|
|
|
X_xp = xp.asarray(X, device=device) |
|
y_xp = xp.asarray(y, device=device) |
|
|
|
est.fit(X, y) |
|
|
|
components_np = est.components_ |
|
explained_variance_np = est.explained_variance_ |
|
|
|
est_xp = clone(est) |
|
with config_context(array_api_dispatch=True): |
|
est_xp.fit(X_xp, y_xp) |
|
components_xp = est_xp.components_ |
|
assert array_device(components_xp) == array_device(X_xp) |
|
components_xp_np = _convert_to_numpy(components_xp, xp=xp) |
|
|
|
explained_variance_xp = est_xp.explained_variance_ |
|
assert array_device(explained_variance_xp) == array_device(X_xp) |
|
explained_variance_xp_np = _convert_to_numpy(explained_variance_xp, xp=xp) |
|
|
|
assert components_xp_np.dtype == components_np.dtype |
|
assert components_xp_np.shape[1] == components_np.shape[1] |
|
assert explained_variance_xp_np.dtype == explained_variance_np.dtype |
|
|
|
|
|
|
|
min_components = min(components_xp_np.shape[0], components_np.shape[0]) |
|
assert_allclose( |
|
explained_variance_xp_np[:min_components], |
|
explained_variance_np[:min_components], |
|
atol=atol, |
|
) |
|
|
|
|
|
|
|
if components_xp_np.shape[0] != components_np.shape[0]: |
|
reference_variance = explained_variance_np[-1] |
|
extra_variance_np = explained_variance_np[min_components:] |
|
extra_variance_xp_np = explained_variance_xp_np[min_components:] |
|
assert all(np.abs(extra_variance_np - reference_variance) < atol) |
|
assert all(np.abs(extra_variance_xp_np - reference_variance) < atol) |
|
|
|
|
|
def test_array_api_error_and_warnings_on_unsupported_params(): |
|
pytest.importorskip("array_api_compat") |
|
xp = pytest.importorskip("array_api_strict") |
|
iris_xp = xp.asarray(iris.data) |
|
|
|
pca = PCA(n_components=2, svd_solver="arpack", random_state=0) |
|
expected_msg = re.escape( |
|
"PCA with svd_solver='arpack' is not supported for Array API inputs." |
|
) |
|
with pytest.raises(ValueError, match=expected_msg): |
|
with config_context(array_api_dispatch=True): |
|
pca.fit(iris_xp) |
|
|
|
pca.set_params(svd_solver="randomized", power_iteration_normalizer="LU") |
|
expected_msg = re.escape( |
|
"Array API does not support LU factorization. Set" |
|
" `power_iteration_normalizer='QR'` instead." |
|
) |
|
with pytest.raises(ValueError, match=expected_msg): |
|
with config_context(array_api_dispatch=True): |
|
pca.fit(iris_xp) |
|
|
|
pca.set_params(svd_solver="randomized", power_iteration_normalizer="auto") |
|
expected_msg = re.escape( |
|
"Array API does not support LU factorization, falling back to QR instead. Set" |
|
" `power_iteration_normalizer='QR'` explicitly to silence this warning." |
|
) |
|
with pytest.warns(UserWarning, match=expected_msg): |
|
with config_context(array_api_dispatch=True): |
|
pca.fit(iris_xp) |
|
|