|
""" |
|
Tests for DBSCAN clustering algorithm |
|
""" |
|
|
|
import pickle |
|
import warnings |
|
|
|
import numpy as np |
|
import pytest |
|
from scipy.spatial import distance |
|
|
|
from sklearn.cluster import DBSCAN, dbscan |
|
from sklearn.cluster.tests.common import generate_clustered_data |
|
from sklearn.metrics.pairwise import pairwise_distances |
|
from sklearn.neighbors import NearestNeighbors |
|
from sklearn.utils._testing import assert_array_equal |
|
from sklearn.utils.fixes import CSR_CONTAINERS, LIL_CONTAINERS |
|
|
|
n_clusters = 3 |
|
X = generate_clustered_data(n_clusters=n_clusters) |
|
|
|
|
|
def test_dbscan_similarity(): |
|
|
|
|
|
eps = 0.15 |
|
min_samples = 10 |
|
|
|
D = distance.squareform(distance.pdist(X)) |
|
D /= np.max(D) |
|
|
|
core_samples, labels = dbscan( |
|
D, metric="precomputed", eps=eps, min_samples=min_samples |
|
) |
|
|
|
n_clusters_1 = len(set(labels)) - (1 if -1 in labels else 0) |
|
|
|
assert n_clusters_1 == n_clusters |
|
|
|
db = DBSCAN(metric="precomputed", eps=eps, min_samples=min_samples) |
|
labels = db.fit(D).labels_ |
|
|
|
n_clusters_2 = len(set(labels)) - int(-1 in labels) |
|
assert n_clusters_2 == n_clusters |
|
|
|
|
|
def test_dbscan_feature(): |
|
|
|
|
|
|
|
eps = 0.8 |
|
min_samples = 10 |
|
metric = "euclidean" |
|
|
|
|
|
core_samples, labels = dbscan(X, metric=metric, eps=eps, min_samples=min_samples) |
|
|
|
|
|
n_clusters_1 = len(set(labels)) - int(-1 in labels) |
|
assert n_clusters_1 == n_clusters |
|
|
|
db = DBSCAN(metric=metric, eps=eps, min_samples=min_samples) |
|
labels = db.fit(X).labels_ |
|
|
|
n_clusters_2 = len(set(labels)) - int(-1 in labels) |
|
assert n_clusters_2 == n_clusters |
|
|
|
|
|
@pytest.mark.parametrize("lil_container", LIL_CONTAINERS) |
|
def test_dbscan_sparse(lil_container): |
|
core_sparse, labels_sparse = dbscan(lil_container(X), eps=0.8, min_samples=10) |
|
core_dense, labels_dense = dbscan(X, eps=0.8, min_samples=10) |
|
assert_array_equal(core_dense, core_sparse) |
|
assert_array_equal(labels_dense, labels_sparse) |
|
|
|
|
|
@pytest.mark.parametrize("include_self", [False, True]) |
|
def test_dbscan_sparse_precomputed(include_self): |
|
D = pairwise_distances(X) |
|
nn = NearestNeighbors(radius=0.9).fit(X) |
|
X_ = X if include_self else None |
|
D_sparse = nn.radius_neighbors_graph(X=X_, mode="distance") |
|
|
|
assert D_sparse.nnz < D.shape[0] * (D.shape[0] - 1) |
|
core_sparse, labels_sparse = dbscan( |
|
D_sparse, eps=0.8, min_samples=10, metric="precomputed" |
|
) |
|
core_dense, labels_dense = dbscan(D, eps=0.8, min_samples=10, metric="precomputed") |
|
assert_array_equal(core_dense, core_sparse) |
|
assert_array_equal(labels_dense, labels_sparse) |
|
|
|
|
|
def test_dbscan_sparse_precomputed_different_eps(): |
|
|
|
|
|
lower_eps = 0.2 |
|
nn = NearestNeighbors(radius=lower_eps).fit(X) |
|
D_sparse = nn.radius_neighbors_graph(X, mode="distance") |
|
dbscan_lower = dbscan(D_sparse, eps=lower_eps, metric="precomputed") |
|
|
|
higher_eps = lower_eps + 0.7 |
|
nn = NearestNeighbors(radius=higher_eps).fit(X) |
|
D_sparse = nn.radius_neighbors_graph(X, mode="distance") |
|
dbscan_higher = dbscan(D_sparse, eps=lower_eps, metric="precomputed") |
|
|
|
assert_array_equal(dbscan_lower[0], dbscan_higher[0]) |
|
assert_array_equal(dbscan_lower[1], dbscan_higher[1]) |
|
|
|
|
|
@pytest.mark.parametrize("metric", ["precomputed", "minkowski"]) |
|
@pytest.mark.parametrize("csr_container", CSR_CONTAINERS + [None]) |
|
def test_dbscan_input_not_modified(metric, csr_container): |
|
|
|
X = np.random.RandomState(0).rand(10, 10) |
|
X = csr_container(X) if csr_container is not None else X |
|
X_copy = X.copy() |
|
dbscan(X, metric=metric) |
|
|
|
if csr_container is not None: |
|
assert_array_equal(X.toarray(), X_copy.toarray()) |
|
else: |
|
assert_array_equal(X, X_copy) |
|
|
|
|
|
@pytest.mark.parametrize("csr_container", CSR_CONTAINERS) |
|
def test_dbscan_input_not_modified_precomputed_sparse_nodiag(csr_container): |
|
"""Check that we don't modify in-place the pre-computed sparse matrix. |
|
|
|
Non-regression test for: |
|
https://github.com/scikit-learn/scikit-learn/issues/27508 |
|
""" |
|
X = np.random.RandomState(0).rand(10, 10) |
|
|
|
|
|
|
|
np.fill_diagonal(X, 0) |
|
X = csr_container(X) |
|
assert all(row != col for row, col in zip(*X.nonzero())) |
|
X_copy = X.copy() |
|
dbscan(X, metric="precomputed") |
|
|
|
|
|
assert X.nnz == X_copy.nnz |
|
assert_array_equal(X.toarray(), X_copy.toarray()) |
|
|
|
|
|
@pytest.mark.parametrize("csr_container", CSR_CONTAINERS) |
|
def test_dbscan_no_core_samples(csr_container): |
|
rng = np.random.RandomState(0) |
|
X = rng.rand(40, 10) |
|
X[X < 0.8] = 0 |
|
|
|
for X_ in [X, csr_container(X)]: |
|
db = DBSCAN(min_samples=6).fit(X_) |
|
assert_array_equal(db.components_, np.empty((0, X_.shape[1]))) |
|
assert_array_equal(db.labels_, -1) |
|
assert db.core_sample_indices_.shape == (0,) |
|
|
|
|
|
def test_dbscan_callable(): |
|
|
|
|
|
|
|
eps = 0.8 |
|
min_samples = 10 |
|
|
|
metric = distance.euclidean |
|
|
|
|
|
core_samples, labels = dbscan( |
|
X, metric=metric, eps=eps, min_samples=min_samples, algorithm="ball_tree" |
|
) |
|
|
|
|
|
n_clusters_1 = len(set(labels)) - int(-1 in labels) |
|
assert n_clusters_1 == n_clusters |
|
|
|
db = DBSCAN(metric=metric, eps=eps, min_samples=min_samples, algorithm="ball_tree") |
|
labels = db.fit(X).labels_ |
|
|
|
n_clusters_2 = len(set(labels)) - int(-1 in labels) |
|
assert n_clusters_2 == n_clusters |
|
|
|
|
|
def test_dbscan_metric_params(): |
|
|
|
eps = 0.8 |
|
min_samples = 10 |
|
p = 1 |
|
|
|
|
|
|
|
with warnings.catch_warnings(record=True) as warns: |
|
db = DBSCAN( |
|
metric="minkowski", |
|
metric_params={"p": p}, |
|
eps=eps, |
|
p=None, |
|
min_samples=min_samples, |
|
algorithm="ball_tree", |
|
).fit(X) |
|
assert not warns, warns[0].message |
|
core_sample_1, labels_1 = db.core_sample_indices_, db.labels_ |
|
|
|
|
|
db = DBSCAN( |
|
metric="minkowski", eps=eps, min_samples=min_samples, algorithm="ball_tree", p=p |
|
).fit(X) |
|
core_sample_2, labels_2 = db.core_sample_indices_, db.labels_ |
|
|
|
assert_array_equal(core_sample_1, core_sample_2) |
|
assert_array_equal(labels_1, labels_2) |
|
|
|
|
|
db = DBSCAN( |
|
metric="manhattan", eps=eps, min_samples=min_samples, algorithm="ball_tree" |
|
).fit(X) |
|
core_sample_3, labels_3 = db.core_sample_indices_, db.labels_ |
|
|
|
assert_array_equal(core_sample_1, core_sample_3) |
|
assert_array_equal(labels_1, labels_3) |
|
|
|
with pytest.warns( |
|
SyntaxWarning, |
|
match=( |
|
"Parameter p is found in metric_params. " |
|
"The corresponding parameter from __init__ " |
|
"is ignored." |
|
), |
|
): |
|
|
|
db = DBSCAN( |
|
metric="minkowski", |
|
metric_params={"p": p}, |
|
eps=eps, |
|
p=p + 1, |
|
min_samples=min_samples, |
|
algorithm="ball_tree", |
|
).fit(X) |
|
core_sample_4, labels_4 = db.core_sample_indices_, db.labels_ |
|
|
|
assert_array_equal(core_sample_1, core_sample_4) |
|
assert_array_equal(labels_1, labels_4) |
|
|
|
|
|
def test_dbscan_balltree(): |
|
|
|
eps = 0.8 |
|
min_samples = 10 |
|
|
|
D = pairwise_distances(X) |
|
core_samples, labels = dbscan( |
|
D, metric="precomputed", eps=eps, min_samples=min_samples |
|
) |
|
|
|
|
|
n_clusters_1 = len(set(labels)) - int(-1 in labels) |
|
assert n_clusters_1 == n_clusters |
|
|
|
db = DBSCAN(p=2.0, eps=eps, min_samples=min_samples, algorithm="ball_tree") |
|
labels = db.fit(X).labels_ |
|
|
|
n_clusters_2 = len(set(labels)) - int(-1 in labels) |
|
assert n_clusters_2 == n_clusters |
|
|
|
db = DBSCAN(p=2.0, eps=eps, min_samples=min_samples, algorithm="kd_tree") |
|
labels = db.fit(X).labels_ |
|
|
|
n_clusters_3 = len(set(labels)) - int(-1 in labels) |
|
assert n_clusters_3 == n_clusters |
|
|
|
db = DBSCAN(p=1.0, eps=eps, min_samples=min_samples, algorithm="ball_tree") |
|
labels = db.fit(X).labels_ |
|
|
|
n_clusters_4 = len(set(labels)) - int(-1 in labels) |
|
assert n_clusters_4 == n_clusters |
|
|
|
db = DBSCAN(leaf_size=20, eps=eps, min_samples=min_samples, algorithm="ball_tree") |
|
labels = db.fit(X).labels_ |
|
|
|
n_clusters_5 = len(set(labels)) - int(-1 in labels) |
|
assert n_clusters_5 == n_clusters |
|
|
|
|
|
def test_input_validation(): |
|
|
|
X = [[1.0, 2.0], [3.0, 4.0]] |
|
DBSCAN().fit(X) |
|
|
|
|
|
def test_pickle(): |
|
obj = DBSCAN() |
|
s = pickle.dumps(obj) |
|
assert type(pickle.loads(s)) is obj.__class__ |
|
|
|
|
|
def test_boundaries(): |
|
|
|
core, _ = dbscan([[0], [1]], eps=2, min_samples=2) |
|
assert 0 in core |
|
|
|
core, _ = dbscan([[0], [1], [1]], eps=1, min_samples=2) |
|
assert 0 in core |
|
core, _ = dbscan([[0], [1], [1]], eps=0.99, min_samples=2) |
|
assert 0 not in core |
|
|
|
|
|
def test_weighted_dbscan(global_random_seed): |
|
|
|
with pytest.raises(ValueError): |
|
dbscan([[0], [1]], sample_weight=[2]) |
|
with pytest.raises(ValueError): |
|
dbscan([[0], [1]], sample_weight=[2, 3, 4]) |
|
|
|
|
|
assert_array_equal([], dbscan([[0], [1]], sample_weight=None, min_samples=6)[0]) |
|
assert_array_equal([], dbscan([[0], [1]], sample_weight=[5, 5], min_samples=6)[0]) |
|
assert_array_equal([0], dbscan([[0], [1]], sample_weight=[6, 5], min_samples=6)[0]) |
|
assert_array_equal( |
|
[0, 1], dbscan([[0], [1]], sample_weight=[6, 6], min_samples=6)[0] |
|
) |
|
|
|
|
|
assert_array_equal( |
|
[0, 1], dbscan([[0], [1]], eps=1.5, sample_weight=[5, 1], min_samples=6)[0] |
|
) |
|
|
|
assert_array_equal( |
|
[], dbscan([[0], [1]], sample_weight=[5, 0], eps=1.5, min_samples=6)[0] |
|
) |
|
assert_array_equal( |
|
[0, 1], dbscan([[0], [1]], sample_weight=[5.9, 0.1], eps=1.5, min_samples=6)[0] |
|
) |
|
assert_array_equal( |
|
[0, 1], dbscan([[0], [1]], sample_weight=[6, 0], eps=1.5, min_samples=6)[0] |
|
) |
|
assert_array_equal( |
|
[], dbscan([[0], [1]], sample_weight=[6, -1], eps=1.5, min_samples=6)[0] |
|
) |
|
|
|
|
|
rng = np.random.RandomState(global_random_seed) |
|
sample_weight = rng.randint(0, 5, X.shape[0]) |
|
core1, label1 = dbscan(X, sample_weight=sample_weight) |
|
assert len(label1) == len(X) |
|
|
|
X_repeated = np.repeat(X, sample_weight, axis=0) |
|
core_repeated, label_repeated = dbscan(X_repeated) |
|
core_repeated_mask = np.zeros(X_repeated.shape[0], dtype=bool) |
|
core_repeated_mask[core_repeated] = True |
|
core_mask = np.zeros(X.shape[0], dtype=bool) |
|
core_mask[core1] = True |
|
assert_array_equal(np.repeat(core_mask, sample_weight), core_repeated_mask) |
|
|
|
|
|
D = pairwise_distances(X) |
|
core3, label3 = dbscan(D, sample_weight=sample_weight, metric="precomputed") |
|
assert_array_equal(core1, core3) |
|
assert_array_equal(label1, label3) |
|
|
|
|
|
est = DBSCAN().fit(X, sample_weight=sample_weight) |
|
core4 = est.core_sample_indices_ |
|
label4 = est.labels_ |
|
assert_array_equal(core1, core4) |
|
assert_array_equal(label1, label4) |
|
|
|
est = DBSCAN() |
|
label5 = est.fit_predict(X, sample_weight=sample_weight) |
|
core5 = est.core_sample_indices_ |
|
assert_array_equal(core1, core5) |
|
assert_array_equal(label1, label5) |
|
assert_array_equal(label1, est.labels_) |
|
|
|
|
|
@pytest.mark.parametrize("algorithm", ["brute", "kd_tree", "ball_tree"]) |
|
def test_dbscan_core_samples_toy(algorithm): |
|
X = [[0], [2], [3], [4], [6], [8], [10]] |
|
n_samples = len(X) |
|
|
|
|
|
|
|
core_samples, labels = dbscan(X, algorithm=algorithm, eps=1, min_samples=1) |
|
assert_array_equal(core_samples, np.arange(n_samples)) |
|
assert_array_equal(labels, [0, 1, 1, 1, 2, 3, 4]) |
|
|
|
|
|
|
|
core_samples, labels = dbscan(X, algorithm=algorithm, eps=1, min_samples=2) |
|
assert_array_equal(core_samples, [1, 2, 3]) |
|
assert_array_equal(labels, [-1, 0, 0, 0, -1, -1, -1]) |
|
|
|
|
|
|
|
core_samples, labels = dbscan(X, algorithm=algorithm, eps=1, min_samples=3) |
|
assert_array_equal(core_samples, [2]) |
|
assert_array_equal(labels, [-1, 0, 0, 0, -1, -1, -1]) |
|
|
|
|
|
|
|
core_samples, labels = dbscan(X, algorithm=algorithm, eps=1, min_samples=4) |
|
assert_array_equal(core_samples, []) |
|
assert_array_equal(labels, np.full(n_samples, -1.0)) |
|
|
|
|
|
def test_dbscan_precomputed_metric_with_degenerate_input_arrays(): |
|
|
|
|
|
X = np.eye(10) |
|
labels = DBSCAN(eps=0.5, metric="precomputed").fit(X).labels_ |
|
assert len(set(labels)) == 1 |
|
|
|
X = np.zeros((10, 10)) |
|
labels = DBSCAN(eps=0.5, metric="precomputed").fit(X).labels_ |
|
assert len(set(labels)) == 1 |
|
|
|
|
|
@pytest.mark.parametrize("csr_container", CSR_CONTAINERS) |
|
def test_dbscan_precomputed_metric_with_initial_rows_zero(csr_container): |
|
|
|
ar = np.array( |
|
[ |
|
[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], |
|
[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], |
|
[0.0, 0.0, 0.0, 0.0, 0.1, 0.0, 0.0], |
|
[0.0, 0.0, 0.0, 0.0, 0.1, 0.0, 0.0], |
|
[0.0, 0.0, 0.1, 0.1, 0.0, 0.0, 0.3], |
|
[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.1], |
|
[0.0, 0.0, 0.0, 0.0, 0.3, 0.1, 0.0], |
|
] |
|
) |
|
matrix = csr_container(ar) |
|
labels = DBSCAN(eps=0.2, metric="precomputed", min_samples=2).fit(matrix).labels_ |
|
assert_array_equal(labels, [-1, -1, 0, 0, 0, 1, 1]) |
|
|