|
import warnings |
|
from types import GeneratorType |
|
|
|
import numpy as np |
|
import pytest |
|
from numpy import linalg |
|
from scipy.sparse import issparse |
|
from scipy.spatial.distance import ( |
|
cdist, |
|
cityblock, |
|
cosine, |
|
minkowski, |
|
pdist, |
|
squareform, |
|
) |
|
|
|
from sklearn import config_context |
|
from sklearn.exceptions import DataConversionWarning |
|
from sklearn.metrics.pairwise import ( |
|
PAIRED_DISTANCES, |
|
PAIRWISE_BOOLEAN_FUNCTIONS, |
|
PAIRWISE_DISTANCE_FUNCTIONS, |
|
PAIRWISE_KERNEL_FUNCTIONS, |
|
_euclidean_distances_upcast, |
|
additive_chi2_kernel, |
|
check_paired_arrays, |
|
check_pairwise_arrays, |
|
chi2_kernel, |
|
cosine_distances, |
|
cosine_similarity, |
|
euclidean_distances, |
|
haversine_distances, |
|
laplacian_kernel, |
|
linear_kernel, |
|
manhattan_distances, |
|
nan_euclidean_distances, |
|
paired_cosine_distances, |
|
paired_distances, |
|
paired_euclidean_distances, |
|
paired_manhattan_distances, |
|
pairwise_distances, |
|
pairwise_distances_argmin, |
|
pairwise_distances_argmin_min, |
|
pairwise_distances_chunked, |
|
pairwise_kernels, |
|
polynomial_kernel, |
|
rbf_kernel, |
|
sigmoid_kernel, |
|
) |
|
from sklearn.preprocessing import normalize |
|
from sklearn.utils._testing import ( |
|
assert_allclose, |
|
assert_almost_equal, |
|
assert_array_equal, |
|
ignore_warnings, |
|
) |
|
from sklearn.utils.fixes import ( |
|
BSR_CONTAINERS, |
|
COO_CONTAINERS, |
|
CSC_CONTAINERS, |
|
CSR_CONTAINERS, |
|
DOK_CONTAINERS, |
|
) |
|
from sklearn.utils.parallel import Parallel, delayed |
|
|
|
|
|
def test_pairwise_distances_for_dense_data(global_dtype): |
|
|
|
rng = np.random.RandomState(0) |
|
|
|
|
|
X = rng.random_sample((5, 4)).astype(global_dtype, copy=False) |
|
S = pairwise_distances(X, metric="euclidean") |
|
S2 = euclidean_distances(X) |
|
assert_allclose(S, S2) |
|
assert S.dtype == S2.dtype == global_dtype |
|
|
|
|
|
Y = rng.random_sample((2, 4)).astype(global_dtype, copy=False) |
|
S = pairwise_distances(X, Y, metric="euclidean") |
|
S2 = euclidean_distances(X, Y) |
|
assert_allclose(S, S2) |
|
assert S.dtype == S2.dtype == global_dtype |
|
|
|
|
|
X_masked = rng.random_sample((5, 4)).astype(global_dtype, copy=False) |
|
Y_masked = rng.random_sample((2, 4)).astype(global_dtype, copy=False) |
|
X_masked[0, 0] = np.nan |
|
Y_masked[0, 0] = np.nan |
|
S_masked = pairwise_distances(X_masked, Y_masked, metric="nan_euclidean") |
|
S2_masked = nan_euclidean_distances(X_masked, Y_masked) |
|
assert_allclose(S_masked, S2_masked) |
|
assert S_masked.dtype == S2_masked.dtype == global_dtype |
|
|
|
|
|
X_tuples = tuple([tuple([v for v in row]) for row in X]) |
|
Y_tuples = tuple([tuple([v for v in row]) for row in Y]) |
|
S2 = pairwise_distances(X_tuples, Y_tuples, metric="euclidean") |
|
assert_allclose(S, S2) |
|
assert S.dtype == S2.dtype == global_dtype |
|
|
|
|
|
|
|
|
|
X = rng.random_sample((5, 2)).astype(global_dtype, copy=False) |
|
X[:, 0] = (X[:, 0] - 0.5) * 2 * np.pi / 2 |
|
X[:, 1] = (X[:, 1] - 0.5) * 2 * np.pi |
|
S = pairwise_distances(X, metric="haversine") |
|
S2 = haversine_distances(X) |
|
assert_allclose(S, S2) |
|
|
|
|
|
Y = rng.random_sample((2, 2)).astype(global_dtype, copy=False) |
|
Y[:, 0] = (Y[:, 0] - 0.5) * 2 * np.pi / 2 |
|
Y[:, 1] = (Y[:, 1] - 0.5) * 2 * np.pi |
|
S = pairwise_distances(X, Y, metric="haversine") |
|
S2 = haversine_distances(X, Y) |
|
assert_allclose(S, S2) |
|
|
|
|
|
|
|
|
|
S = pairwise_distances(X, metric="cityblock") |
|
S2 = pairwise_distances(X, metric=cityblock) |
|
assert S.shape[0] == S.shape[1] |
|
assert S.shape[0] == X.shape[0] |
|
assert_allclose(S, S2) |
|
|
|
|
|
S = pairwise_distances(X, Y, metric="manhattan") |
|
S2 = pairwise_distances(X, Y, metric=cityblock) |
|
assert S.shape[0] == X.shape[0] |
|
assert S.shape[1] == Y.shape[0] |
|
assert_allclose(S, S2) |
|
|
|
|
|
|
|
|
|
S = pairwise_distances(X, Y, metric="cosine") |
|
S2 = pairwise_distances(X, Y, metric=cosine) |
|
assert S.shape[0] == X.shape[0] |
|
assert S.shape[1] == Y.shape[0] |
|
assert_allclose(S, S2) |
|
|
|
|
|
@pytest.mark.parametrize("coo_container", COO_CONTAINERS) |
|
@pytest.mark.parametrize("csc_container", CSC_CONTAINERS) |
|
@pytest.mark.parametrize("bsr_container", BSR_CONTAINERS) |
|
@pytest.mark.parametrize("csr_container", CSR_CONTAINERS) |
|
def test_pairwise_distances_for_sparse_data( |
|
coo_container, csc_container, bsr_container, csr_container, global_dtype |
|
): |
|
|
|
rng = np.random.RandomState(0) |
|
X = rng.random_sample((5, 4)).astype(global_dtype, copy=False) |
|
Y = rng.random_sample((2, 4)).astype(global_dtype, copy=False) |
|
|
|
|
|
|
|
X_sparse = csr_container(X) |
|
Y_sparse = csr_container(Y) |
|
|
|
S = pairwise_distances(X_sparse, Y_sparse, metric="euclidean") |
|
S2 = euclidean_distances(X_sparse, Y_sparse) |
|
assert_allclose(S, S2) |
|
assert S.dtype == S2.dtype == global_dtype |
|
|
|
S = pairwise_distances(X_sparse, Y_sparse, metric="cosine") |
|
S2 = cosine_distances(X_sparse, Y_sparse) |
|
assert_allclose(S, S2) |
|
assert S.dtype == S2.dtype == global_dtype |
|
|
|
S = pairwise_distances(X_sparse, csc_container(Y), metric="manhattan") |
|
S2 = manhattan_distances(bsr_container(X), coo_container(Y)) |
|
assert_allclose(S, S2) |
|
if global_dtype == np.float64: |
|
assert S.dtype == S2.dtype == global_dtype |
|
else: |
|
|
|
|
|
|
|
with pytest.raises(AssertionError): |
|
assert S.dtype == S2.dtype == global_dtype |
|
|
|
S2 = manhattan_distances(X, Y) |
|
assert_allclose(S, S2) |
|
if global_dtype == np.float64: |
|
assert S.dtype == S2.dtype == global_dtype |
|
else: |
|
|
|
|
|
|
|
with pytest.raises(AssertionError): |
|
assert S.dtype == S2.dtype == global_dtype |
|
|
|
|
|
kwds = {"p": 2.0} |
|
S = pairwise_distances(X, Y, metric="minkowski", **kwds) |
|
S2 = pairwise_distances(X, Y, metric=minkowski, **kwds) |
|
assert_allclose(S, S2) |
|
|
|
|
|
kwds = {"p": 2.0} |
|
S = pairwise_distances(X, metric="minkowski", **kwds) |
|
S2 = pairwise_distances(X, metric=minkowski, **kwds) |
|
assert_allclose(S, S2) |
|
|
|
|
|
with pytest.raises(TypeError): |
|
pairwise_distances(X_sparse, metric="minkowski") |
|
with pytest.raises(TypeError): |
|
pairwise_distances(X, Y_sparse, metric="minkowski") |
|
|
|
|
|
|
|
|
|
@ignore_warnings(category=DeprecationWarning) |
|
@pytest.mark.parametrize("metric", PAIRWISE_BOOLEAN_FUNCTIONS) |
|
def test_pairwise_boolean_distance(metric): |
|
|
|
rng = np.random.RandomState(0) |
|
X = rng.randn(5, 4) |
|
Y = X.copy() |
|
Y[0, 0] = 1 - Y[0, 0] |
|
|
|
|
|
with ignore_warnings(category=DataConversionWarning): |
|
for Z in [Y, None]: |
|
res = pairwise_distances(X, Z, metric=metric) |
|
np.nan_to_num(res, nan=0, posinf=0, neginf=0, copy=False) |
|
assert np.sum(res != 0) == 0 |
|
|
|
|
|
|
|
msg = "Data was converted to boolean for metric %s" % metric |
|
with pytest.warns(DataConversionWarning, match=msg): |
|
pairwise_distances(X, metric=metric) |
|
|
|
|
|
with pytest.warns(DataConversionWarning, match=msg): |
|
pairwise_distances(X.astype(bool), Y=Y, metric=metric) |
|
|
|
|
|
with warnings.catch_warnings(): |
|
warnings.simplefilter("error", DataConversionWarning) |
|
pairwise_distances(X.astype(bool), metric=metric) |
|
|
|
|
|
def test_no_data_conversion_warning(): |
|
|
|
rng = np.random.RandomState(0) |
|
X = rng.randn(5, 4) |
|
with warnings.catch_warnings(): |
|
warnings.simplefilter("error", DataConversionWarning) |
|
pairwise_distances(X, metric="minkowski") |
|
|
|
|
|
@pytest.mark.parametrize("func", [pairwise_distances, pairwise_kernels]) |
|
def test_pairwise_precomputed(func): |
|
|
|
with pytest.raises(ValueError, match=".* shape .*"): |
|
func(np.zeros((5, 3)), metric="precomputed") |
|
|
|
with pytest.raises(ValueError, match=".* shape .*"): |
|
func(np.zeros((5, 3)), np.zeros((4, 4)), metric="precomputed") |
|
|
|
with pytest.raises(ValueError, match=".* shape .*"): |
|
func(np.zeros((5, 3)), np.zeros((4, 3)), metric="precomputed") |
|
|
|
|
|
S = np.zeros((5, 5)) |
|
S2 = func(S, metric="precomputed") |
|
assert S is S2 |
|
|
|
S = np.zeros((5, 3)) |
|
S2 = func(S, np.zeros((3, 3)), metric="precomputed") |
|
assert S is S2 |
|
|
|
|
|
S = func(np.array([[1]], dtype="int"), metric="precomputed") |
|
assert "f" == S.dtype.kind |
|
|
|
|
|
S = func([[1.0]], metric="precomputed") |
|
assert isinstance(S, np.ndarray) |
|
|
|
|
|
def test_pairwise_precomputed_non_negative(): |
|
|
|
with pytest.raises(ValueError, match=".* non-negative values.*"): |
|
pairwise_distances(np.full((5, 5), -1), metric="precomputed") |
|
|
|
|
|
_minkowski_kwds = {"w": np.arange(1, 5).astype("double", copy=False), "p": 1} |
|
|
|
|
|
def callable_rbf_kernel(x, y, **kwds): |
|
|
|
K = rbf_kernel(np.atleast_2d(x), np.atleast_2d(y), **kwds) |
|
|
|
return K.item() |
|
|
|
|
|
@pytest.mark.parametrize( |
|
"func, metric, kwds", |
|
[ |
|
(pairwise_distances, "euclidean", {}), |
|
( |
|
pairwise_distances, |
|
minkowski, |
|
_minkowski_kwds, |
|
), |
|
( |
|
pairwise_distances, |
|
"minkowski", |
|
_minkowski_kwds, |
|
), |
|
(pairwise_kernels, "polynomial", {"degree": 1}), |
|
(pairwise_kernels, callable_rbf_kernel, {"gamma": 0.1}), |
|
], |
|
) |
|
@pytest.mark.parametrize("dtype", [np.float64, np.float32, int]) |
|
def test_pairwise_parallel(func, metric, kwds, dtype): |
|
rng = np.random.RandomState(0) |
|
X = np.array(5 * rng.random_sample((5, 4)), dtype=dtype) |
|
Y = np.array(5 * rng.random_sample((3, 4)), dtype=dtype) |
|
|
|
S = func(X, metric=metric, n_jobs=1, **kwds) |
|
S2 = func(X, metric=metric, n_jobs=2, **kwds) |
|
assert_allclose(S, S2) |
|
|
|
S = func(X, Y, metric=metric, n_jobs=1, **kwds) |
|
S2 = func(X, Y, metric=metric, n_jobs=2, **kwds) |
|
assert_allclose(S, S2) |
|
|
|
|
|
def test_pairwise_callable_nonstrict_metric(): |
|
|
|
|
|
|
|
assert pairwise_distances([[1.0]], metric=lambda x, y: 5)[0, 0] == 5 |
|
|
|
|
|
|
|
@pytest.mark.parametrize( |
|
"metric", |
|
["rbf", "laplacian", "sigmoid", "polynomial", "linear", "chi2", "additive_chi2"], |
|
) |
|
@pytest.mark.parametrize("csr_container", CSR_CONTAINERS) |
|
def test_pairwise_kernels(metric, csr_container): |
|
|
|
|
|
rng = np.random.RandomState(0) |
|
X = rng.random_sample((5, 4)) |
|
Y = rng.random_sample((2, 4)) |
|
function = PAIRWISE_KERNEL_FUNCTIONS[metric] |
|
|
|
K1 = pairwise_kernels(X, metric=metric) |
|
K2 = function(X) |
|
assert_allclose(K1, K2) |
|
|
|
K1 = pairwise_kernels(X, Y=Y, metric=metric) |
|
K2 = function(X, Y=Y) |
|
assert_allclose(K1, K2) |
|
|
|
X_tuples = tuple([tuple([v for v in row]) for row in X]) |
|
Y_tuples = tuple([tuple([v for v in row]) for row in Y]) |
|
K2 = pairwise_kernels(X_tuples, Y_tuples, metric=metric) |
|
assert_allclose(K1, K2) |
|
|
|
|
|
X_sparse = csr_container(X) |
|
Y_sparse = csr_container(Y) |
|
if metric in ["chi2", "additive_chi2"]: |
|
|
|
return |
|
K1 = pairwise_kernels(X_sparse, Y=Y_sparse, metric=metric) |
|
assert_allclose(K1, K2) |
|
|
|
|
|
def test_pairwise_kernels_callable(): |
|
|
|
|
|
rng = np.random.RandomState(0) |
|
X = rng.random_sample((5, 4)) |
|
Y = rng.random_sample((2, 4)) |
|
|
|
metric = callable_rbf_kernel |
|
kwds = {"gamma": 0.1} |
|
K1 = pairwise_kernels(X, Y=Y, metric=metric, **kwds) |
|
K2 = rbf_kernel(X, Y=Y, **kwds) |
|
assert_allclose(K1, K2) |
|
|
|
|
|
K1 = pairwise_kernels(X, Y=X, metric=metric, **kwds) |
|
K2 = rbf_kernel(X, Y=X, **kwds) |
|
assert_allclose(K1, K2) |
|
|
|
|
|
def test_pairwise_kernels_filter_param(): |
|
rng = np.random.RandomState(0) |
|
X = rng.random_sample((5, 4)) |
|
Y = rng.random_sample((2, 4)) |
|
K = rbf_kernel(X, Y, gamma=0.1) |
|
params = {"gamma": 0.1, "blabla": ":)"} |
|
K2 = pairwise_kernels(X, Y, metric="rbf", filter_params=True, **params) |
|
assert_allclose(K, K2) |
|
|
|
with pytest.raises(TypeError): |
|
pairwise_kernels(X, Y, metric="rbf", **params) |
|
|
|
|
|
@pytest.mark.parametrize("metric, func", PAIRED_DISTANCES.items()) |
|
@pytest.mark.parametrize("csr_container", CSR_CONTAINERS) |
|
def test_paired_distances(metric, func, csr_container): |
|
|
|
rng = np.random.RandomState(0) |
|
|
|
X = rng.random_sample((5, 4)) |
|
|
|
Y = rng.random_sample((5, 4)) |
|
|
|
S = paired_distances(X, Y, metric=metric) |
|
S2 = func(X, Y) |
|
assert_allclose(S, S2) |
|
S3 = func(csr_container(X), csr_container(Y)) |
|
assert_allclose(S, S3) |
|
if metric in PAIRWISE_DISTANCE_FUNCTIONS: |
|
|
|
|
|
distances = PAIRWISE_DISTANCE_FUNCTIONS[metric](X, Y) |
|
distances = np.diag(distances) |
|
assert_allclose(distances, S) |
|
|
|
|
|
def test_paired_distances_callable(global_dtype): |
|
|
|
|
|
rng = np.random.RandomState(0) |
|
|
|
X = rng.random_sample((5, 4)).astype(global_dtype, copy=False) |
|
|
|
Y = rng.random_sample((5, 4)).astype(global_dtype, copy=False) |
|
|
|
S = paired_distances(X, Y, metric="manhattan") |
|
S2 = paired_distances(X, Y, metric=lambda x, y: np.abs(x - y).sum(axis=0)) |
|
assert_allclose(S, S2) |
|
|
|
|
|
|
|
Y = rng.random_sample((3, 4)) |
|
with pytest.raises(ValueError): |
|
paired_distances(X, Y) |
|
|
|
|
|
@pytest.mark.parametrize("dok_container", DOK_CONTAINERS) |
|
@pytest.mark.parametrize("csr_container", CSR_CONTAINERS) |
|
def test_pairwise_distances_argmin_min(dok_container, csr_container, global_dtype): |
|
|
|
X = np.asarray([[0], [1]], dtype=global_dtype) |
|
Y = np.asarray([[-2], [3]], dtype=global_dtype) |
|
|
|
Xsp = dok_container(X) |
|
Ysp = csr_container(Y, dtype=global_dtype) |
|
|
|
expected_idx = [0, 1] |
|
expected_vals = [2, 2] |
|
expected_vals_sq = [4, 4] |
|
|
|
|
|
idx, vals = pairwise_distances_argmin_min(X, Y, metric="euclidean") |
|
idx2 = pairwise_distances_argmin(X, Y, metric="euclidean") |
|
assert_allclose(idx, expected_idx) |
|
assert_allclose(idx2, expected_idx) |
|
assert_allclose(vals, expected_vals) |
|
|
|
idxsp, valssp = pairwise_distances_argmin_min(Xsp, Ysp, metric="euclidean") |
|
idxsp2 = pairwise_distances_argmin(Xsp, Ysp, metric="euclidean") |
|
assert_allclose(idxsp, expected_idx) |
|
assert_allclose(idxsp2, expected_idx) |
|
assert_allclose(valssp, expected_vals) |
|
|
|
assert type(idxsp) == np.ndarray |
|
assert type(valssp) == np.ndarray |
|
|
|
|
|
idx, vals = pairwise_distances_argmin_min(X, Y, metric="sqeuclidean") |
|
idx2, vals2 = pairwise_distances_argmin_min( |
|
X, Y, metric="euclidean", metric_kwargs={"squared": True} |
|
) |
|
idx3 = pairwise_distances_argmin(X, Y, metric="sqeuclidean") |
|
idx4 = pairwise_distances_argmin( |
|
X, Y, metric="euclidean", metric_kwargs={"squared": True} |
|
) |
|
|
|
assert_allclose(vals, expected_vals_sq) |
|
assert_allclose(vals2, expected_vals_sq) |
|
|
|
assert_allclose(idx, expected_idx) |
|
assert_allclose(idx2, expected_idx) |
|
assert_allclose(idx3, expected_idx) |
|
assert_allclose(idx4, expected_idx) |
|
|
|
|
|
idx, vals = pairwise_distances_argmin_min(X, Y, metric="manhattan") |
|
idx2 = pairwise_distances_argmin(X, Y, metric="manhattan") |
|
assert_allclose(idx, expected_idx) |
|
assert_allclose(idx2, expected_idx) |
|
assert_allclose(vals, expected_vals) |
|
|
|
idxsp, valssp = pairwise_distances_argmin_min(Xsp, Ysp, metric="manhattan") |
|
idxsp2 = pairwise_distances_argmin(Xsp, Ysp, metric="manhattan") |
|
assert_allclose(idxsp, expected_idx) |
|
assert_allclose(idxsp2, expected_idx) |
|
assert_allclose(valssp, expected_vals) |
|
|
|
|
|
idx, vals = pairwise_distances_argmin_min( |
|
X, Y, metric=minkowski, metric_kwargs={"p": 2} |
|
) |
|
assert_allclose(idx, expected_idx) |
|
assert_allclose(vals, expected_vals) |
|
|
|
|
|
idx, vals = pairwise_distances_argmin_min( |
|
X, Y, metric="minkowski", metric_kwargs={"p": 2} |
|
) |
|
assert_allclose(idx, expected_idx) |
|
assert_allclose(vals, expected_vals) |
|
|
|
|
|
rng = np.random.RandomState(0) |
|
X = rng.randn(97, 149) |
|
Y = rng.randn(111, 149) |
|
|
|
dist = pairwise_distances(X, Y, metric="manhattan") |
|
dist_orig_ind = dist.argmin(axis=0) |
|
dist_orig_val = dist[dist_orig_ind, range(len(dist_orig_ind))] |
|
|
|
dist_chunked_ind, dist_chunked_val = pairwise_distances_argmin_min( |
|
X, Y, axis=0, metric="manhattan" |
|
) |
|
assert_allclose(dist_orig_ind, dist_chunked_ind, rtol=1e-7) |
|
assert_allclose(dist_orig_val, dist_chunked_val, rtol=1e-7) |
|
|
|
|
|
argmin_0, dist_0 = pairwise_distances_argmin_min(X, Y, axis=0) |
|
argmin_1, dist_1 = pairwise_distances_argmin_min(Y, X, axis=1) |
|
|
|
assert_allclose(dist_0, dist_1) |
|
assert_array_equal(argmin_0, argmin_1) |
|
|
|
argmin_0, dist_0 = pairwise_distances_argmin_min(X, X, axis=0) |
|
argmin_1, dist_1 = pairwise_distances_argmin_min(X, X, axis=1) |
|
|
|
assert_allclose(dist_0, dist_1) |
|
assert_array_equal(argmin_0, argmin_1) |
|
|
|
|
|
argmin_0 = pairwise_distances_argmin(X, Y, axis=0) |
|
argmin_1 = pairwise_distances_argmin(Y, X, axis=1) |
|
|
|
assert_array_equal(argmin_0, argmin_1) |
|
|
|
argmin_0 = pairwise_distances_argmin(X, X, axis=0) |
|
argmin_1 = pairwise_distances_argmin(X, X, axis=1) |
|
|
|
assert_array_equal(argmin_0, argmin_1) |
|
|
|
|
|
argmin_C_contiguous = pairwise_distances_argmin(X, Y) |
|
argmin_F_contiguous = pairwise_distances_argmin( |
|
np.asfortranarray(X), np.asfortranarray(Y) |
|
) |
|
|
|
assert_array_equal(argmin_C_contiguous, argmin_F_contiguous) |
|
|
|
|
|
def _reduce_func(dist, start): |
|
return dist[:, :100] |
|
|
|
|
|
def test_pairwise_distances_chunked_reduce(global_dtype): |
|
rng = np.random.RandomState(0) |
|
X = rng.random_sample((400, 4)).astype(global_dtype, copy=False) |
|
|
|
S = pairwise_distances(X)[:, :100] |
|
S_chunks = pairwise_distances_chunked( |
|
X, None, reduce_func=_reduce_func, working_memory=2**-16 |
|
) |
|
assert isinstance(S_chunks, GeneratorType) |
|
S_chunks = list(S_chunks) |
|
assert len(S_chunks) > 1 |
|
assert S_chunks[0].dtype == X.dtype |
|
|
|
|
|
assert_allclose(np.vstack(S_chunks), S, atol=1e-7) |
|
|
|
|
|
def test_pairwise_distances_chunked_reduce_none(global_dtype): |
|
|
|
rng = np.random.RandomState(0) |
|
X = rng.random_sample((10, 4)).astype(global_dtype, copy=False) |
|
S_chunks = pairwise_distances_chunked( |
|
X, None, reduce_func=lambda dist, start: None, working_memory=2**-16 |
|
) |
|
assert isinstance(S_chunks, GeneratorType) |
|
S_chunks = list(S_chunks) |
|
assert len(S_chunks) > 1 |
|
assert all(chunk is None for chunk in S_chunks) |
|
|
|
|
|
@pytest.mark.parametrize( |
|
"good_reduce", |
|
[ |
|
lambda D, start: list(D), |
|
lambda D, start: np.array(D), |
|
lambda D, start: (list(D), list(D)), |
|
] |
|
+ [ |
|
lambda D, start, scipy_csr_type=scipy_csr_type: scipy_csr_type(D) |
|
for scipy_csr_type in CSR_CONTAINERS |
|
] |
|
+ [ |
|
lambda D, start, scipy_dok_type=scipy_dok_type: ( |
|
scipy_dok_type(D), |
|
np.array(D), |
|
list(D), |
|
) |
|
for scipy_dok_type in DOK_CONTAINERS |
|
], |
|
) |
|
def test_pairwise_distances_chunked_reduce_valid(good_reduce): |
|
X = np.arange(10).reshape(-1, 1) |
|
S_chunks = pairwise_distances_chunked( |
|
X, None, reduce_func=good_reduce, working_memory=64 |
|
) |
|
next(S_chunks) |
|
|
|
|
|
@pytest.mark.parametrize( |
|
("bad_reduce", "err_type", "message"), |
|
[ |
|
( |
|
lambda D, s: np.concatenate([D, D[-1:]]), |
|
ValueError, |
|
r"length 11\..* input: 10\.", |
|
), |
|
( |
|
lambda D, s: (D, np.concatenate([D, D[-1:]])), |
|
ValueError, |
|
r"length \(10, 11\)\..* input: 10\.", |
|
), |
|
(lambda D, s: (D[:9], D), ValueError, r"length \(9, 10\)\..* input: 10\."), |
|
( |
|
lambda D, s: 7, |
|
TypeError, |
|
r"returned 7\. Expected sequence\(s\) of length 10\.", |
|
), |
|
( |
|
lambda D, s: (7, 8), |
|
TypeError, |
|
r"returned \(7, 8\)\. Expected sequence\(s\) of length 10\.", |
|
), |
|
( |
|
lambda D, s: (np.arange(10), 9), |
|
TypeError, |
|
r", 9\)\. Expected sequence\(s\) of length 10\.", |
|
), |
|
], |
|
) |
|
def test_pairwise_distances_chunked_reduce_invalid( |
|
global_dtype, bad_reduce, err_type, message |
|
): |
|
X = np.arange(10).reshape(-1, 1).astype(global_dtype, copy=False) |
|
S_chunks = pairwise_distances_chunked( |
|
X, None, reduce_func=bad_reduce, working_memory=64 |
|
) |
|
with pytest.raises(err_type, match=message): |
|
next(S_chunks) |
|
|
|
|
|
def check_pairwise_distances_chunked(X, Y, working_memory, metric="euclidean"): |
|
gen = pairwise_distances_chunked(X, Y, working_memory=working_memory, metric=metric) |
|
assert isinstance(gen, GeneratorType) |
|
blockwise_distances = list(gen) |
|
Y = X if Y is None else Y |
|
min_block_mib = len(Y) * 8 * 2**-20 |
|
|
|
for block in blockwise_distances: |
|
memory_used = block.nbytes |
|
assert memory_used <= max(working_memory, min_block_mib) * 2**20 |
|
|
|
blockwise_distances = np.vstack(blockwise_distances) |
|
S = pairwise_distances(X, Y, metric=metric) |
|
assert_allclose(blockwise_distances, S, atol=1e-7) |
|
|
|
|
|
@pytest.mark.parametrize("metric", ("euclidean", "l2", "sqeuclidean")) |
|
def test_pairwise_distances_chunked_diagonal(metric, global_dtype): |
|
rng = np.random.RandomState(0) |
|
X = rng.normal(size=(1000, 10), scale=1e10).astype(global_dtype, copy=False) |
|
chunks = list(pairwise_distances_chunked(X, working_memory=1, metric=metric)) |
|
assert len(chunks) > 1 |
|
assert_allclose(np.diag(np.vstack(chunks)), 0, rtol=1e-10) |
|
|
|
|
|
@pytest.mark.parametrize("metric", ("euclidean", "l2", "sqeuclidean")) |
|
def test_parallel_pairwise_distances_diagonal(metric, global_dtype): |
|
rng = np.random.RandomState(0) |
|
X = rng.normal(size=(1000, 10), scale=1e10).astype(global_dtype, copy=False) |
|
distances = pairwise_distances(X, metric=metric, n_jobs=2) |
|
assert_allclose(np.diag(distances), 0, atol=1e-10) |
|
|
|
|
|
@pytest.mark.filterwarnings("ignore:Could not adhere to working_memory config") |
|
def test_pairwise_distances_chunked(global_dtype): |
|
|
|
rng = np.random.RandomState(0) |
|
|
|
X = rng.random_sample((200, 4)).astype(global_dtype, copy=False) |
|
check_pairwise_distances_chunked(X, None, working_memory=1, metric="euclidean") |
|
|
|
for power in range(-16, 0): |
|
check_pairwise_distances_chunked( |
|
X, None, working_memory=2**power, metric="euclidean" |
|
) |
|
|
|
check_pairwise_distances_chunked( |
|
X.tolist(), None, working_memory=1, metric="euclidean" |
|
) |
|
|
|
Y = rng.random_sample((100, 4)).astype(global_dtype, copy=False) |
|
check_pairwise_distances_chunked(X, Y, working_memory=1, metric="euclidean") |
|
check_pairwise_distances_chunked( |
|
X.tolist(), Y.tolist(), working_memory=1, metric="euclidean" |
|
) |
|
|
|
check_pairwise_distances_chunked(X, Y, working_memory=10000, metric="euclidean") |
|
|
|
|
|
check_pairwise_distances_chunked(X, Y, working_memory=1, metric="cityblock") |
|
|
|
|
|
D = pairwise_distances(X) |
|
gen = pairwise_distances_chunked(D, working_memory=2**-16, metric="precomputed") |
|
assert isinstance(gen, GeneratorType) |
|
assert next(gen) is D |
|
with pytest.raises(StopIteration): |
|
next(gen) |
|
|
|
|
|
@pytest.mark.parametrize( |
|
"x_array_constr", |
|
[np.array] + CSR_CONTAINERS, |
|
ids=["dense"] + [container.__name__ for container in CSR_CONTAINERS], |
|
) |
|
@pytest.mark.parametrize( |
|
"y_array_constr", |
|
[np.array] + CSR_CONTAINERS, |
|
ids=["dense"] + [container.__name__ for container in CSR_CONTAINERS], |
|
) |
|
def test_euclidean_distances_known_result(x_array_constr, y_array_constr): |
|
|
|
X = x_array_constr([[0]]) |
|
Y = y_array_constr([[1], [2]]) |
|
D = euclidean_distances(X, Y) |
|
assert_allclose(D, [[1.0, 2.0]]) |
|
|
|
|
|
@pytest.mark.parametrize( |
|
"y_array_constr", |
|
[np.array] + CSR_CONTAINERS, |
|
ids=["dense"] + [container.__name__ for container in CSR_CONTAINERS], |
|
) |
|
def test_euclidean_distances_with_norms(global_dtype, y_array_constr): |
|
|
|
|
|
rng = np.random.RandomState(0) |
|
X = rng.random_sample((10, 10)).astype(global_dtype, copy=False) |
|
Y = rng.random_sample((20, 10)).astype(global_dtype, copy=False) |
|
|
|
|
|
X_norm_sq = (X.astype(np.float64) ** 2).sum(axis=1).reshape(1, -1) |
|
Y_norm_sq = (Y.astype(np.float64) ** 2).sum(axis=1).reshape(1, -1) |
|
|
|
Y = y_array_constr(Y) |
|
|
|
D1 = euclidean_distances(X, Y) |
|
D2 = euclidean_distances(X, Y, X_norm_squared=X_norm_sq) |
|
D3 = euclidean_distances(X, Y, Y_norm_squared=Y_norm_sq) |
|
D4 = euclidean_distances(X, Y, X_norm_squared=X_norm_sq, Y_norm_squared=Y_norm_sq) |
|
assert_allclose(D2, D1) |
|
assert_allclose(D3, D1) |
|
assert_allclose(D4, D1) |
|
|
|
|
|
wrong_D = euclidean_distances( |
|
X, |
|
Y, |
|
X_norm_squared=np.zeros_like(X_norm_sq), |
|
Y_norm_squared=np.zeros_like(Y_norm_sq), |
|
) |
|
with pytest.raises(AssertionError): |
|
assert_allclose(wrong_D, D1) |
|
|
|
|
|
@pytest.mark.parametrize("symmetric", [True, False]) |
|
def test_euclidean_distances_float32_norms(global_random_seed, symmetric): |
|
|
|
rng = np.random.RandomState(global_random_seed) |
|
X = rng.random_sample((10, 10)) |
|
Y = X if symmetric else rng.random_sample((20, 10)) |
|
X_norm_sq = (X.astype(np.float32) ** 2).sum(axis=1).reshape(1, -1) |
|
Y_norm_sq = (Y.astype(np.float32) ** 2).sum(axis=1).reshape(1, -1) |
|
D1 = euclidean_distances(X, Y) |
|
D2 = euclidean_distances(X, Y, X_norm_squared=X_norm_sq) |
|
D3 = euclidean_distances(X, Y, Y_norm_squared=Y_norm_sq) |
|
D4 = euclidean_distances(X, Y, X_norm_squared=X_norm_sq, Y_norm_squared=Y_norm_sq) |
|
assert_allclose(D2, D1) |
|
assert_allclose(D3, D1) |
|
assert_allclose(D4, D1) |
|
|
|
|
|
def test_euclidean_distances_norm_shapes(): |
|
|
|
rng = np.random.RandomState(0) |
|
X = rng.random_sample((10, 10)) |
|
Y = rng.random_sample((20, 10)) |
|
|
|
X_norm_squared = (X**2).sum(axis=1) |
|
Y_norm_squared = (Y**2).sum(axis=1) |
|
|
|
D1 = euclidean_distances( |
|
X, Y, X_norm_squared=X_norm_squared, Y_norm_squared=Y_norm_squared |
|
) |
|
D2 = euclidean_distances( |
|
X, |
|
Y, |
|
X_norm_squared=X_norm_squared.reshape(-1, 1), |
|
Y_norm_squared=Y_norm_squared.reshape(-1, 1), |
|
) |
|
D3 = euclidean_distances( |
|
X, |
|
Y, |
|
X_norm_squared=X_norm_squared.reshape(1, -1), |
|
Y_norm_squared=Y_norm_squared.reshape(1, -1), |
|
) |
|
|
|
assert_allclose(D2, D1) |
|
assert_allclose(D3, D1) |
|
|
|
with pytest.raises(ValueError, match="Incompatible dimensions for X"): |
|
euclidean_distances(X, Y, X_norm_squared=X_norm_squared[:5]) |
|
with pytest.raises(ValueError, match="Incompatible dimensions for Y"): |
|
euclidean_distances(X, Y, Y_norm_squared=Y_norm_squared[:5]) |
|
|
|
|
|
@pytest.mark.parametrize( |
|
"x_array_constr", |
|
[np.array] + CSR_CONTAINERS, |
|
ids=["dense"] + [container.__name__ for container in CSR_CONTAINERS], |
|
) |
|
@pytest.mark.parametrize( |
|
"y_array_constr", |
|
[np.array] + CSR_CONTAINERS, |
|
ids=["dense"] + [container.__name__ for container in CSR_CONTAINERS], |
|
) |
|
def test_euclidean_distances(global_dtype, x_array_constr, y_array_constr): |
|
|
|
|
|
rng = np.random.RandomState(0) |
|
X = rng.random_sample((100, 10)).astype(global_dtype, copy=False) |
|
X[X < 0.8] = 0 |
|
Y = rng.random_sample((10, 10)).astype(global_dtype, copy=False) |
|
Y[Y < 0.8] = 0 |
|
|
|
expected = cdist(X, Y) |
|
|
|
X = x_array_constr(X) |
|
Y = y_array_constr(Y) |
|
distances = euclidean_distances(X, Y) |
|
|
|
|
|
|
|
assert_allclose(distances, expected, rtol=1e-6) |
|
assert distances.dtype == global_dtype |
|
|
|
|
|
@pytest.mark.parametrize( |
|
"x_array_constr", |
|
[np.array] + CSR_CONTAINERS, |
|
ids=["dense"] + [container.__name__ for container in CSR_CONTAINERS], |
|
) |
|
def test_euclidean_distances_sym(global_dtype, x_array_constr): |
|
|
|
|
|
rng = np.random.RandomState(0) |
|
X = rng.random_sample((100, 10)).astype(global_dtype, copy=False) |
|
X[X < 0.8] = 0 |
|
|
|
expected = squareform(pdist(X)) |
|
|
|
X = x_array_constr(X) |
|
distances = euclidean_distances(X) |
|
|
|
|
|
|
|
assert_allclose(distances, expected, rtol=1e-6) |
|
assert distances.dtype == global_dtype |
|
|
|
|
|
@pytest.mark.parametrize("batch_size", [None, 5, 7, 101]) |
|
@pytest.mark.parametrize( |
|
"x_array_constr", |
|
[np.array] + CSR_CONTAINERS, |
|
ids=["dense"] + [container.__name__ for container in CSR_CONTAINERS], |
|
) |
|
@pytest.mark.parametrize( |
|
"y_array_constr", |
|
[np.array] + CSR_CONTAINERS, |
|
ids=["dense"] + [container.__name__ for container in CSR_CONTAINERS], |
|
) |
|
def test_euclidean_distances_upcast(batch_size, x_array_constr, y_array_constr): |
|
|
|
rng = np.random.RandomState(0) |
|
X = rng.random_sample((100, 10)).astype(np.float32) |
|
X[X < 0.8] = 0 |
|
Y = rng.random_sample((10, 10)).astype(np.float32) |
|
Y[Y < 0.8] = 0 |
|
|
|
expected = cdist(X, Y) |
|
|
|
X = x_array_constr(X) |
|
Y = y_array_constr(Y) |
|
distances = _euclidean_distances_upcast(X, Y=Y, batch_size=batch_size) |
|
distances = np.sqrt(np.maximum(distances, 0)) |
|
|
|
|
|
|
|
assert_allclose(distances, expected, rtol=1e-6) |
|
|
|
|
|
@pytest.mark.parametrize("batch_size", [None, 5, 7, 101]) |
|
@pytest.mark.parametrize( |
|
"x_array_constr", |
|
[np.array] + CSR_CONTAINERS, |
|
ids=["dense"] + [container.__name__ for container in CSR_CONTAINERS], |
|
) |
|
def test_euclidean_distances_upcast_sym(batch_size, x_array_constr): |
|
|
|
rng = np.random.RandomState(0) |
|
X = rng.random_sample((100, 10)).astype(np.float32) |
|
X[X < 0.8] = 0 |
|
|
|
expected = squareform(pdist(X)) |
|
|
|
X = x_array_constr(X) |
|
distances = _euclidean_distances_upcast(X, Y=X, batch_size=batch_size) |
|
distances = np.sqrt(np.maximum(distances, 0)) |
|
|
|
|
|
|
|
assert_allclose(distances, expected, rtol=1e-6) |
|
|
|
|
|
@pytest.mark.parametrize( |
|
"dtype, eps, rtol", |
|
[ |
|
(np.float32, 1e-4, 1e-5), |
|
pytest.param( |
|
np.float64, |
|
1e-8, |
|
0.99, |
|
marks=pytest.mark.xfail(reason="failing due to lack of precision"), |
|
), |
|
], |
|
) |
|
@pytest.mark.parametrize("dim", [1, 1000000]) |
|
def test_euclidean_distances_extreme_values(dtype, eps, rtol, dim): |
|
|
|
|
|
X = np.array([[1.0] * dim], dtype=dtype) |
|
Y = np.array([[1.0 + eps] * dim], dtype=dtype) |
|
|
|
distances = euclidean_distances(X, Y) |
|
expected = cdist(X, Y) |
|
|
|
assert_allclose(distances, expected, rtol=1e-5) |
|
|
|
|
|
@pytest.mark.parametrize("squared", [True, False]) |
|
def test_nan_euclidean_distances_equal_to_euclidean_distance(squared): |
|
|
|
rng = np.random.RandomState(1337) |
|
X = rng.randn(3, 4) |
|
Y = rng.randn(4, 4) |
|
|
|
normal_distance = euclidean_distances(X, Y=Y, squared=squared) |
|
nan_distance = nan_euclidean_distances(X, Y=Y, squared=squared) |
|
assert_allclose(normal_distance, nan_distance) |
|
|
|
|
|
@pytest.mark.parametrize("X", [np.array([[np.inf, 0]]), np.array([[0, -np.inf]])]) |
|
@pytest.mark.parametrize("Y", [np.array([[np.inf, 0]]), np.array([[0, -np.inf]]), None]) |
|
def test_nan_euclidean_distances_infinite_values(X, Y): |
|
with pytest.raises(ValueError) as excinfo: |
|
nan_euclidean_distances(X, Y=Y) |
|
|
|
exp_msg = "Input contains infinity or a value too large for dtype('float64')." |
|
assert exp_msg == str(excinfo.value) |
|
|
|
|
|
@pytest.mark.parametrize( |
|
"X, X_diag, missing_value", |
|
[ |
|
(np.array([[0, 1], [1, 0]]), np.sqrt(2), np.nan), |
|
(np.array([[0, 1], [1, np.nan]]), np.sqrt(2), np.nan), |
|
(np.array([[np.nan, 1], [1, np.nan]]), np.nan, np.nan), |
|
(np.array([[np.nan, 1], [np.nan, 0]]), np.sqrt(2), np.nan), |
|
(np.array([[0, np.nan], [1, np.nan]]), np.sqrt(2), np.nan), |
|
(np.array([[0, 1], [1, 0]]), np.sqrt(2), -1), |
|
(np.array([[0, 1], [1, -1]]), np.sqrt(2), -1), |
|
(np.array([[-1, 1], [1, -1]]), np.nan, -1), |
|
(np.array([[-1, 1], [-1, 0]]), np.sqrt(2), -1), |
|
(np.array([[0, -1], [1, -1]]), np.sqrt(2), -1), |
|
], |
|
) |
|
def test_nan_euclidean_distances_2x2(X, X_diag, missing_value): |
|
exp_dist = np.array([[0.0, X_diag], [X_diag, 0]]) |
|
|
|
dist = nan_euclidean_distances(X, missing_values=missing_value) |
|
assert_allclose(exp_dist, dist) |
|
|
|
dist_sq = nan_euclidean_distances(X, squared=True, missing_values=missing_value) |
|
assert_allclose(exp_dist**2, dist_sq) |
|
|
|
dist_two = nan_euclidean_distances(X, X, missing_values=missing_value) |
|
assert_allclose(exp_dist, dist_two) |
|
|
|
dist_two_copy = nan_euclidean_distances(X, X.copy(), missing_values=missing_value) |
|
assert_allclose(exp_dist, dist_two_copy) |
|
|
|
|
|
@pytest.mark.parametrize("missing_value", [np.nan, -1]) |
|
def test_nan_euclidean_distances_complete_nan(missing_value): |
|
X = np.array([[missing_value, missing_value], [0, 1]]) |
|
|
|
exp_dist = np.array([[np.nan, np.nan], [np.nan, 0]]) |
|
|
|
dist = nan_euclidean_distances(X, missing_values=missing_value) |
|
assert_allclose(exp_dist, dist) |
|
|
|
dist = nan_euclidean_distances(X, X.copy(), missing_values=missing_value) |
|
assert_allclose(exp_dist, dist) |
|
|
|
|
|
@pytest.mark.parametrize("missing_value", [np.nan, -1]) |
|
def test_nan_euclidean_distances_not_trival(missing_value): |
|
X = np.array( |
|
[ |
|
[1.0, missing_value, 3.0, 4.0, 2.0], |
|
[missing_value, 4.0, 6.0, 1.0, missing_value], |
|
[3.0, missing_value, missing_value, missing_value, 1.0], |
|
] |
|
) |
|
|
|
Y = np.array( |
|
[ |
|
[missing_value, 7.0, 7.0, missing_value, 2.0], |
|
[missing_value, missing_value, 5.0, 4.0, 7.0], |
|
[missing_value, missing_value, missing_value, 4.0, 5.0], |
|
] |
|
) |
|
|
|
|
|
D1 = nan_euclidean_distances(X, Y, missing_values=missing_value) |
|
D2 = nan_euclidean_distances(Y, X, missing_values=missing_value) |
|
|
|
assert_almost_equal(D1, D2.T) |
|
|
|
|
|
assert_allclose( |
|
nan_euclidean_distances( |
|
X[:1], Y[:1], squared=True, missing_values=missing_value |
|
), |
|
[[5.0 / 2.0 * ((7 - 3) ** 2 + (2 - 2) ** 2)]], |
|
) |
|
|
|
|
|
assert_allclose( |
|
nan_euclidean_distances( |
|
X[1:2], Y[1:2], squared=False, missing_values=missing_value |
|
), |
|
[[np.sqrt(5.0 / 2.0 * ((6 - 5) ** 2 + (1 - 4) ** 2))]], |
|
) |
|
|
|
|
|
D3 = nan_euclidean_distances(X, missing_values=missing_value) |
|
D4 = nan_euclidean_distances(X, X, missing_values=missing_value) |
|
D5 = nan_euclidean_distances(X, X.copy(), missing_values=missing_value) |
|
assert_allclose(D3, D4) |
|
assert_allclose(D4, D5) |
|
|
|
|
|
D6 = nan_euclidean_distances(X, Y, copy=True) |
|
D7 = nan_euclidean_distances(X, Y, copy=False) |
|
assert_allclose(D6, D7) |
|
|
|
|
|
@pytest.mark.parametrize("missing_value", [np.nan, -1]) |
|
def test_nan_euclidean_distances_one_feature_match_positive(missing_value): |
|
|
|
|
|
|
|
X = np.array( |
|
[ |
|
[-122.27, 648.0, missing_value, 37.85], |
|
[-122.27, missing_value, 2.34701493, missing_value], |
|
] |
|
) |
|
|
|
dist_squared = nan_euclidean_distances( |
|
X, missing_values=missing_value, squared=True |
|
) |
|
assert np.all(dist_squared >= 0) |
|
|
|
dist = nan_euclidean_distances(X, missing_values=missing_value, squared=False) |
|
assert_allclose(dist, 0.0) |
|
|
|
|
|
def test_cosine_distances(): |
|
|
|
rng = np.random.RandomState(1337) |
|
x = np.abs(rng.rand(910)) |
|
XA = np.vstack([x, x]) |
|
D = cosine_distances(XA) |
|
assert_allclose(D, [[0.0, 0.0], [0.0, 0.0]], atol=1e-10) |
|
|
|
assert np.all(D >= 0.0) |
|
assert np.all(D <= 2.0) |
|
|
|
assert_allclose(D[np.diag_indices_from(D)], [0.0, 0.0]) |
|
|
|
XB = np.vstack([x, -x]) |
|
D2 = cosine_distances(XB) |
|
|
|
assert np.all(D2 >= 0.0) |
|
assert np.all(D2 <= 2.0) |
|
|
|
assert_allclose(D2, [[0.0, 2.0], [2.0, 0.0]]) |
|
|
|
|
|
X = np.abs(rng.rand(1000, 5000)) |
|
D = cosine_distances(X) |
|
|
|
assert_allclose(D[np.diag_indices_from(D)], [0.0] * D.shape[0]) |
|
assert np.all(D >= 0.0) |
|
assert np.all(D <= 2.0) |
|
|
|
|
|
def test_haversine_distances(): |
|
|
|
def slow_haversine_distances(x, y): |
|
diff_lat = y[0] - x[0] |
|
diff_lon = y[1] - x[1] |
|
a = np.sin(diff_lat / 2) ** 2 + ( |
|
np.cos(x[0]) * np.cos(y[0]) * np.sin(diff_lon / 2) ** 2 |
|
) |
|
c = 2 * np.arcsin(np.sqrt(a)) |
|
return c |
|
|
|
rng = np.random.RandomState(0) |
|
X = rng.random_sample((5, 2)) |
|
Y = rng.random_sample((10, 2)) |
|
D1 = np.array([[slow_haversine_distances(x, y) for y in Y] for x in X]) |
|
D2 = haversine_distances(X, Y) |
|
assert_allclose(D1, D2) |
|
|
|
X = rng.random_sample((10, 3)) |
|
err_msg = "Haversine distance only valid in 2 dimensions" |
|
with pytest.raises(ValueError, match=err_msg): |
|
haversine_distances(X) |
|
|
|
|
|
|
|
|
|
|
|
def test_paired_euclidean_distances(): |
|
|
|
X = [[0], [0]] |
|
Y = [[1], [2]] |
|
D = paired_euclidean_distances(X, Y) |
|
assert_allclose(D, [1.0, 2.0]) |
|
|
|
|
|
def test_paired_manhattan_distances(): |
|
|
|
X = [[0], [0]] |
|
Y = [[1], [2]] |
|
D = paired_manhattan_distances(X, Y) |
|
assert_allclose(D, [1.0, 2.0]) |
|
|
|
|
|
def test_paired_cosine_distances(): |
|
|
|
X = [[0], [0]] |
|
Y = [[1], [2]] |
|
D = paired_cosine_distances(X, Y) |
|
assert_allclose(D, [0.5, 0.5]) |
|
|
|
|
|
def test_chi_square_kernel(): |
|
rng = np.random.RandomState(0) |
|
X = rng.random_sample((5, 4)) |
|
Y = rng.random_sample((10, 4)) |
|
K_add = additive_chi2_kernel(X, Y) |
|
gamma = 0.1 |
|
K = chi2_kernel(X, Y, gamma=gamma) |
|
assert K.dtype == float |
|
for i, x in enumerate(X): |
|
for j, y in enumerate(Y): |
|
chi2 = -np.sum((x - y) ** 2 / (x + y)) |
|
chi2_exp = np.exp(gamma * chi2) |
|
assert_almost_equal(K_add[i, j], chi2) |
|
assert_almost_equal(K[i, j], chi2_exp) |
|
|
|
|
|
K = chi2_kernel(Y) |
|
assert_array_equal(np.diag(K), 1) |
|
|
|
assert np.all(K > 0) |
|
assert np.all(K - np.diag(np.diag(K)) < 1) |
|
|
|
X = rng.random_sample((5, 4)).astype(np.float32) |
|
Y = rng.random_sample((10, 4)).astype(np.float32) |
|
K = chi2_kernel(X, Y) |
|
assert K.dtype == np.float32 |
|
|
|
|
|
|
|
X = rng.random_sample((10, 4)).astype(np.int32) |
|
K = chi2_kernel(X, X) |
|
assert np.isfinite(K).all() |
|
assert K.dtype == float |
|
|
|
|
|
X = [[0.3, 0.7], [1.0, 0]] |
|
Y = [[0, 1], [0.9, 0.1]] |
|
K = chi2_kernel(X, Y) |
|
assert K[0, 0] > K[0, 1] |
|
assert K[1, 1] > K[1, 0] |
|
|
|
|
|
with pytest.raises(ValueError): |
|
chi2_kernel([[0, -1]]) |
|
with pytest.raises(ValueError): |
|
chi2_kernel([[0, -1]], [[-1, -1]]) |
|
with pytest.raises(ValueError): |
|
chi2_kernel([[0, 1]], [[-1, -1]]) |
|
|
|
|
|
with pytest.raises(ValueError): |
|
chi2_kernel([[0, 1]], [[0.2, 0.2, 0.6]]) |
|
|
|
|
|
@pytest.mark.parametrize( |
|
"kernel", |
|
( |
|
linear_kernel, |
|
polynomial_kernel, |
|
rbf_kernel, |
|
laplacian_kernel, |
|
sigmoid_kernel, |
|
cosine_similarity, |
|
), |
|
) |
|
def test_kernel_symmetry(kernel): |
|
|
|
rng = np.random.RandomState(0) |
|
X = rng.random_sample((5, 4)) |
|
K = kernel(X, X) |
|
assert_allclose(K, K.T, 15) |
|
|
|
|
|
@pytest.mark.parametrize( |
|
"kernel", |
|
( |
|
linear_kernel, |
|
polynomial_kernel, |
|
rbf_kernel, |
|
laplacian_kernel, |
|
sigmoid_kernel, |
|
cosine_similarity, |
|
), |
|
) |
|
@pytest.mark.parametrize("csr_container", CSR_CONTAINERS) |
|
def test_kernel_sparse(kernel, csr_container): |
|
rng = np.random.RandomState(0) |
|
X = rng.random_sample((5, 4)) |
|
X_sparse = csr_container(X) |
|
K = kernel(X, X) |
|
K2 = kernel(X_sparse, X_sparse) |
|
assert_allclose(K, K2) |
|
|
|
|
|
def test_linear_kernel(): |
|
rng = np.random.RandomState(0) |
|
X = rng.random_sample((5, 4)) |
|
K = linear_kernel(X, X) |
|
|
|
assert_allclose(K.flat[::6], [linalg.norm(x) ** 2 for x in X]) |
|
|
|
|
|
def test_rbf_kernel(): |
|
rng = np.random.RandomState(0) |
|
X = rng.random_sample((5, 4)) |
|
K = rbf_kernel(X, X) |
|
|
|
assert_allclose(K.flat[::6], np.ones(5)) |
|
|
|
|
|
def test_laplacian_kernel(): |
|
rng = np.random.RandomState(0) |
|
X = rng.random_sample((5, 4)) |
|
K = laplacian_kernel(X, X) |
|
|
|
assert_allclose(np.diag(K), np.ones(5)) |
|
|
|
|
|
assert np.all(K > 0) |
|
assert np.all(K - np.diag(np.diag(K)) < 1) |
|
|
|
|
|
@pytest.mark.parametrize( |
|
"metric, pairwise_func", |
|
[("linear", linear_kernel), ("cosine", cosine_similarity)], |
|
) |
|
@pytest.mark.parametrize("csr_container", CSR_CONTAINERS) |
|
def test_pairwise_similarity_sparse_output(metric, pairwise_func, csr_container): |
|
rng = np.random.RandomState(0) |
|
X = rng.random_sample((5, 4)) |
|
Y = rng.random_sample((3, 4)) |
|
Xcsr = csr_container(X) |
|
Ycsr = csr_container(Y) |
|
|
|
|
|
K1 = pairwise_func(Xcsr, Ycsr, dense_output=False) |
|
assert issparse(K1) |
|
|
|
|
|
K2 = pairwise_func(X, Y, dense_output=True) |
|
assert not issparse(K2) |
|
assert_allclose(K1.toarray(), K2) |
|
|
|
|
|
K3 = pairwise_kernels(X, Y=Y, metric=metric) |
|
assert_allclose(K1.toarray(), K3) |
|
|
|
|
|
@pytest.mark.parametrize("csr_container", CSR_CONTAINERS) |
|
def test_cosine_similarity(csr_container): |
|
|
|
|
|
rng = np.random.RandomState(0) |
|
X = rng.random_sample((5, 4)) |
|
Y = rng.random_sample((3, 4)) |
|
Xcsr = csr_container(X) |
|
Ycsr = csr_container(Y) |
|
|
|
for X_, Y_ in ((X, None), (X, Y), (Xcsr, None), (Xcsr, Ycsr)): |
|
|
|
|
|
K1 = pairwise_kernels(X_, Y=Y_, metric="cosine") |
|
X_ = normalize(X_) |
|
if Y_ is not None: |
|
Y_ = normalize(Y_) |
|
K2 = pairwise_kernels(X_, Y=Y_, metric="linear") |
|
assert_allclose(K1, K2) |
|
|
|
|
|
def test_check_dense_matrices(): |
|
|
|
|
|
XA = np.resize(np.arange(40), (5, 8)) |
|
XA_checked, XB_checked = check_pairwise_arrays(XA, None) |
|
assert XA_checked is XB_checked |
|
assert_array_equal(XA, XA_checked) |
|
|
|
|
|
def test_check_XB_returned(): |
|
|
|
|
|
|
|
XA = np.resize(np.arange(40), (5, 8)) |
|
XB = np.resize(np.arange(32), (4, 8)) |
|
XA_checked, XB_checked = check_pairwise_arrays(XA, XB) |
|
assert_array_equal(XA, XA_checked) |
|
assert_array_equal(XB, XB_checked) |
|
|
|
XB = np.resize(np.arange(40), (5, 8)) |
|
XA_checked, XB_checked = check_paired_arrays(XA, XB) |
|
assert_array_equal(XA, XA_checked) |
|
assert_array_equal(XB, XB_checked) |
|
|
|
|
|
def test_check_different_dimensions(): |
|
|
|
XA = np.resize(np.arange(45), (5, 9)) |
|
XB = np.resize(np.arange(32), (4, 8)) |
|
with pytest.raises(ValueError): |
|
check_pairwise_arrays(XA, XB) |
|
|
|
XB = np.resize(np.arange(4 * 9), (4, 9)) |
|
with pytest.raises(ValueError): |
|
check_paired_arrays(XA, XB) |
|
|
|
|
|
def test_check_invalid_dimensions(): |
|
|
|
|
|
|
|
XA = np.arange(45).reshape(9, 5) |
|
XB = np.arange(32).reshape(4, 8) |
|
with pytest.raises(ValueError): |
|
check_pairwise_arrays(XA, XB) |
|
XA = np.arange(45).reshape(9, 5) |
|
XB = np.arange(32).reshape(4, 8) |
|
with pytest.raises(ValueError): |
|
check_pairwise_arrays(XA, XB) |
|
|
|
|
|
@pytest.mark.parametrize("csr_container", CSR_CONTAINERS) |
|
def test_check_sparse_arrays(csr_container): |
|
|
|
rng = np.random.RandomState(0) |
|
XA = rng.random_sample((5, 4)) |
|
XA_sparse = csr_container(XA) |
|
XB = rng.random_sample((5, 4)) |
|
XB_sparse = csr_container(XB) |
|
XA_checked, XB_checked = check_pairwise_arrays(XA_sparse, XB_sparse) |
|
|
|
|
|
assert issparse(XA_checked) |
|
assert abs(XA_sparse - XA_checked).sum() == 0 |
|
assert issparse(XB_checked) |
|
assert abs(XB_sparse - XB_checked).sum() == 0 |
|
|
|
XA_checked, XA_2_checked = check_pairwise_arrays(XA_sparse, XA_sparse) |
|
assert issparse(XA_checked) |
|
assert abs(XA_sparse - XA_checked).sum() == 0 |
|
assert issparse(XA_2_checked) |
|
assert abs(XA_2_checked - XA_checked).sum() == 0 |
|
|
|
|
|
def tuplify(X): |
|
|
|
s = X.shape |
|
if len(s) > 1: |
|
|
|
return tuple(tuplify(row) for row in X) |
|
else: |
|
|
|
return tuple(r for r in X) |
|
|
|
|
|
def test_check_tuple_input(): |
|
|
|
rng = np.random.RandomState(0) |
|
XA = rng.random_sample((5, 4)) |
|
XA_tuples = tuplify(XA) |
|
XB = rng.random_sample((5, 4)) |
|
XB_tuples = tuplify(XB) |
|
XA_checked, XB_checked = check_pairwise_arrays(XA_tuples, XB_tuples) |
|
assert_array_equal(XA_tuples, XA_checked) |
|
assert_array_equal(XB_tuples, XB_checked) |
|
|
|
|
|
def test_check_preserve_type(): |
|
|
|
XA = np.resize(np.arange(40), (5, 8)).astype(np.float32) |
|
XB = np.resize(np.arange(40), (5, 8)).astype(np.float32) |
|
|
|
XA_checked, XB_checked = check_pairwise_arrays(XA, None) |
|
assert XA_checked.dtype == np.float32 |
|
|
|
|
|
XA_checked, XB_checked = check_pairwise_arrays(XA, XB) |
|
assert XA_checked.dtype == np.float32 |
|
assert XB_checked.dtype == np.float32 |
|
|
|
|
|
XA_checked, XB_checked = check_pairwise_arrays(XA.astype(float), XB) |
|
assert XA_checked.dtype == float |
|
assert XB_checked.dtype == float |
|
|
|
|
|
XA_checked, XB_checked = check_pairwise_arrays(XA, XB.astype(float)) |
|
assert XA_checked.dtype == float |
|
assert XB_checked.dtype == float |
|
|
|
|
|
@pytest.mark.parametrize("n_jobs", [1, 2]) |
|
@pytest.mark.parametrize("metric", ["seuclidean", "mahalanobis"]) |
|
@pytest.mark.parametrize( |
|
"dist_function", [pairwise_distances, pairwise_distances_chunked] |
|
) |
|
def test_pairwise_distances_data_derived_params(n_jobs, metric, dist_function): |
|
|
|
|
|
with config_context(working_memory=0.1): |
|
rng = np.random.RandomState(0) |
|
X = rng.random_sample((100, 10)) |
|
|
|
expected_dist = squareform(pdist(X, metric=metric)) |
|
dist = np.vstack(tuple(dist_function(X, metric=metric, n_jobs=n_jobs))) |
|
|
|
assert_allclose(dist, expected_dist) |
|
|
|
|
|
@pytest.mark.parametrize("metric", ["seuclidean", "mahalanobis"]) |
|
def test_pairwise_distances_data_derived_params_error(metric): |
|
|
|
|
|
rng = np.random.RandomState(0) |
|
X = rng.random_sample((100, 10)) |
|
Y = rng.random_sample((100, 10)) |
|
|
|
with pytest.raises( |
|
ValueError, |
|
match=rf"The '(V|VI)' parameter is required for the " rf"{metric} metric", |
|
): |
|
pairwise_distances(X, Y, metric=metric) |
|
|
|
|
|
@pytest.mark.parametrize( |
|
"metric", |
|
[ |
|
"braycurtis", |
|
"canberra", |
|
"chebyshev", |
|
"correlation", |
|
"hamming", |
|
"mahalanobis", |
|
"minkowski", |
|
"seuclidean", |
|
"sqeuclidean", |
|
"cityblock", |
|
"cosine", |
|
"euclidean", |
|
], |
|
) |
|
@pytest.mark.parametrize("y_is_x", [True, False], ids=["Y is X", "Y is not X"]) |
|
def test_numeric_pairwise_distances_datatypes(metric, global_dtype, y_is_x): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
rng = np.random.RandomState(0) |
|
|
|
X = rng.random_sample((5, 4)).astype(global_dtype, copy=False) |
|
|
|
params = {} |
|
if y_is_x: |
|
Y = X |
|
expected_dist = squareform(pdist(X, metric=metric)) |
|
else: |
|
Y = rng.random_sample((5, 4)).astype(global_dtype, copy=False) |
|
expected_dist = cdist(X, Y, metric=metric) |
|
|
|
if metric == "seuclidean": |
|
params = {"V": np.var(np.vstack([X, Y]), axis=0, ddof=1, dtype=np.float64)} |
|
elif metric == "mahalanobis": |
|
params = {"VI": np.linalg.inv(np.cov(np.vstack([X, Y]).T)).T} |
|
|
|
dist = pairwise_distances(X, Y, metric=metric, **params) |
|
|
|
assert_allclose(dist, expected_dist) |
|
|
|
|
|
@pytest.mark.parametrize( |
|
"pairwise_distances_func", |
|
[pairwise_distances, pairwise_distances_argmin, pairwise_distances_argmin_min], |
|
) |
|
def test_nan_euclidean_support(pairwise_distances_func): |
|
"""Check that `nan_euclidean` is lenient with `nan` values.""" |
|
|
|
X = [[0, 1], [1, np.nan], [2, 3], [3, 5]] |
|
output = pairwise_distances_func(X, X, metric="nan_euclidean") |
|
|
|
assert not np.isnan(output).any() |
|
|
|
|
|
def test_nan_euclidean_constant_input_argmin(): |
|
"""Check that the behavior of constant input is the same in the case of |
|
full of nan vector and full of zero vector. |
|
""" |
|
|
|
X_nan = [[np.nan, np.nan], [np.nan, np.nan], [np.nan, np.nan]] |
|
argmin_nan = pairwise_distances_argmin(X_nan, X_nan, metric="nan_euclidean") |
|
|
|
X_const = [[0, 0], [0, 0], [0, 0]] |
|
argmin_const = pairwise_distances_argmin(X_const, X_const, metric="nan_euclidean") |
|
|
|
assert_allclose(argmin_nan, argmin_const) |
|
|
|
|
|
@pytest.mark.parametrize( |
|
"X,Y,expected_distance", |
|
[ |
|
( |
|
["a", "ab", "abc"], |
|
None, |
|
[[0.0, 1.0, 2.0], [1.0, 0.0, 1.0], [2.0, 1.0, 0.0]], |
|
), |
|
( |
|
["a", "ab", "abc"], |
|
["a", "ab"], |
|
[[0.0, 1.0], [1.0, 0.0], [2.0, 1.0]], |
|
), |
|
], |
|
) |
|
def test_pairwise_dist_custom_metric_for_string(X, Y, expected_distance): |
|
"""Check pairwise_distances with lists of strings as input.""" |
|
|
|
def dummy_string_similarity(x, y): |
|
return np.abs(len(x) - len(y)) |
|
|
|
actual_distance = pairwise_distances(X=X, Y=Y, metric=dummy_string_similarity) |
|
assert_allclose(actual_distance, expected_distance) |
|
|
|
|
|
def test_pairwise_dist_custom_metric_for_bool(): |
|
"""Check that pairwise_distances does not convert boolean input to float |
|
when using a custom metric. |
|
""" |
|
|
|
def dummy_bool_dist(v1, v2): |
|
|
|
return 1 - (v1 & v2).sum() / (v1 | v2).sum() |
|
|
|
X = np.array([[1, 0, 0, 0], [1, 0, 1, 0], [1, 1, 1, 1]], dtype=bool) |
|
|
|
expected_distance = np.array( |
|
[ |
|
[0.0, 0.5, 0.75], |
|
[0.5, 0.0, 0.5], |
|
[0.75, 0.5, 0.0], |
|
] |
|
) |
|
|
|
actual_distance = pairwise_distances(X=X, metric=dummy_bool_dist) |
|
assert_allclose(actual_distance, expected_distance) |
|
|
|
|
|
@pytest.mark.parametrize("csr_container", CSR_CONTAINERS) |
|
def test_sparse_manhattan_readonly_dataset(csr_container): |
|
|
|
matrices1 = [csr_container(np.ones((5, 5)))] |
|
matrices2 = [csr_container(np.ones((5, 5)))] |
|
|
|
|
|
Parallel(n_jobs=2, max_nbytes=0)( |
|
delayed(manhattan_distances)(m1, m2) for m1, m2 in zip(matrices1, matrices2) |
|
) |
|
|
|
|
|
|
|
def test_force_all_finite_rename_warning(): |
|
X = np.random.uniform(size=(10, 10)) |
|
Y = np.random.uniform(size=(10, 10)) |
|
|
|
msg = "'force_all_finite' was renamed to 'ensure_all_finite'" |
|
|
|
with pytest.warns(FutureWarning, match=msg): |
|
check_pairwise_arrays(X, Y, force_all_finite=True) |
|
|
|
with pytest.warns(FutureWarning, match=msg): |
|
pairwise_distances(X, Y, force_all_finite=True) |
|
|