spam-classifier
/
venv
/lib
/python3.11
/site-packages
/scipy
/interpolate
/tests
/test_rbfinterp.py
import pickle | |
import pytest | |
import numpy as np | |
from numpy.linalg import LinAlgError | |
from scipy._lib._array_api import xp_assert_close | |
from scipy.stats.qmc import Halton | |
from scipy.spatial import cKDTree # type: ignore[attr-defined] | |
from scipy.interpolate._rbfinterp import ( | |
_AVAILABLE, _SCALE_INVARIANT, _NAME_TO_MIN_DEGREE, _monomial_powers, | |
RBFInterpolator | |
) | |
from scipy.interpolate import _rbfinterp_pythran | |
from scipy._lib._testutils import _run_concurrent_barrier | |
def _vandermonde(x, degree): | |
# Returns a matrix of monomials that span polynomials with the specified | |
# degree evaluated at x. | |
powers = _monomial_powers(x.shape[1], degree) | |
return _rbfinterp_pythran._polynomial_matrix(x, powers) | |
def _1d_test_function(x): | |
# Test function used in Wahba's "Spline Models for Observational Data". | |
# domain ~= (0, 3), range ~= (-1.0, 0.2) | |
x = x[:, 0] | |
y = 4.26*(np.exp(-x) - 4*np.exp(-2*x) + 3*np.exp(-3*x)) | |
return y | |
def _2d_test_function(x): | |
# Franke's test function. | |
# domain ~= (0, 1) X (0, 1), range ~= (0.0, 1.2) | |
x1, x2 = x[:, 0], x[:, 1] | |
term1 = 0.75 * np.exp(-(9*x1-2)**2/4 - (9*x2-2)**2/4) | |
term2 = 0.75 * np.exp(-(9*x1+1)**2/49 - (9*x2+1)/10) | |
term3 = 0.5 * np.exp(-(9*x1-7)**2/4 - (9*x2-3)**2/4) | |
term4 = -0.2 * np.exp(-(9*x1-4)**2 - (9*x2-7)**2) | |
y = term1 + term2 + term3 + term4 | |
return y | |
def _is_conditionally_positive_definite(kernel, m): | |
# Tests whether the kernel is conditionally positive definite of order m. | |
# See chapter 7 of Fasshauer's "Meshfree Approximation Methods with | |
# MATLAB". | |
nx = 10 | |
ntests = 100 | |
for ndim in [1, 2, 3, 4, 5]: | |
# Generate sample points with a Halton sequence to avoid samples that | |
# are too close to each other, which can make the matrix singular. | |
seq = Halton(ndim, scramble=False, seed=np.random.RandomState()) | |
for _ in range(ntests): | |
x = 2*seq.random(nx) - 1 | |
A = _rbfinterp_pythran._kernel_matrix(x, kernel) | |
P = _vandermonde(x, m - 1) | |
Q, R = np.linalg.qr(P, mode='complete') | |
# Q2 forms a basis spanning the space where P.T.dot(x) = 0. Project | |
# A onto this space, and then see if it is positive definite using | |
# the Cholesky decomposition. If not, then the kernel is not c.p.d. | |
# of order m. | |
Q2 = Q[:, P.shape[1]:] | |
B = Q2.T.dot(A).dot(Q2) | |
try: | |
np.linalg.cholesky(B) | |
except np.linalg.LinAlgError: | |
return False | |
return True | |
# Sorting the parametrize arguments is necessary to avoid a parallelization | |
# issue described here: https://github.com/pytest-dev/pytest-xdist/issues/432. | |
def test_conditionally_positive_definite(kernel): | |
# Test if each kernel in _AVAILABLE is conditionally positive definite of | |
# order m, where m comes from _NAME_TO_MIN_DEGREE. This is a necessary | |
# condition for the smoothed RBF interpolant to be well-posed in general. | |
m = _NAME_TO_MIN_DEGREE.get(kernel, -1) + 1 | |
assert _is_conditionally_positive_definite(kernel, m) | |
class _TestRBFInterpolator: | |
def test_scale_invariance_1d(self, kernel): | |
# Verify that the functions in _SCALE_INVARIANT are insensitive to the | |
# shape parameter (when smoothing == 0) in 1d. | |
seq = Halton(1, scramble=False, seed=np.random.RandomState()) | |
x = 3*seq.random(50) | |
y = _1d_test_function(x) | |
xitp = 3*seq.random(50) | |
yitp1 = self.build(x, y, epsilon=1.0, kernel=kernel)(xitp) | |
yitp2 = self.build(x, y, epsilon=2.0, kernel=kernel)(xitp) | |
xp_assert_close(yitp1, yitp2, atol=1e-8) | |
def test_scale_invariance_2d(self, kernel): | |
# Verify that the functions in _SCALE_INVARIANT are insensitive to the | |
# shape parameter (when smoothing == 0) in 2d. | |
seq = Halton(2, scramble=False, seed=np.random.RandomState()) | |
x = seq.random(100) | |
y = _2d_test_function(x) | |
xitp = seq.random(100) | |
yitp1 = self.build(x, y, epsilon=1.0, kernel=kernel)(xitp) | |
yitp2 = self.build(x, y, epsilon=2.0, kernel=kernel)(xitp) | |
xp_assert_close(yitp1, yitp2, atol=1e-8) | |
def test_extreme_domains(self, kernel): | |
# Make sure the interpolant remains numerically stable for very | |
# large/small domains. | |
seq = Halton(2, scramble=False, seed=np.random.RandomState()) | |
scale = 1e50 | |
shift = 1e55 | |
x = seq.random(100) | |
y = _2d_test_function(x) | |
xitp = seq.random(100) | |
if kernel in _SCALE_INVARIANT: | |
yitp1 = self.build(x, y, kernel=kernel)(xitp) | |
yitp2 = self.build( | |
x*scale + shift, y, | |
kernel=kernel | |
)(xitp*scale + shift) | |
else: | |
yitp1 = self.build(x, y, epsilon=5.0, kernel=kernel)(xitp) | |
yitp2 = self.build( | |
x*scale + shift, y, | |
epsilon=5.0/scale, | |
kernel=kernel | |
)(xitp*scale + shift) | |
xp_assert_close(yitp1, yitp2, atol=1e-8) | |
def test_polynomial_reproduction(self): | |
# If the observed data comes from a polynomial, then the interpolant | |
# should be able to reproduce the polynomial exactly, provided that | |
# `degree` is sufficiently high. | |
rng = np.random.RandomState(0) | |
seq = Halton(2, scramble=False, seed=rng) | |
degree = 3 | |
x = seq.random(50) | |
xitp = seq.random(50) | |
P = _vandermonde(x, degree) | |
Pitp = _vandermonde(xitp, degree) | |
poly_coeffs = rng.normal(0.0, 1.0, P.shape[1]) | |
y = P.dot(poly_coeffs) | |
yitp1 = Pitp.dot(poly_coeffs) | |
yitp2 = self.build(x, y, degree=degree)(xitp) | |
xp_assert_close(yitp1, yitp2, atol=1e-8) | |
def test_chunking(self, monkeypatch): | |
# If the observed data comes from a polynomial, then the interpolant | |
# should be able to reproduce the polynomial exactly, provided that | |
# `degree` is sufficiently high. | |
rng = np.random.RandomState(0) | |
seq = Halton(2, scramble=False, seed=rng) | |
degree = 3 | |
largeN = 1000 + 33 | |
# this is large to check that chunking of the RBFInterpolator is tested | |
x = seq.random(50) | |
xitp = seq.random(largeN) | |
P = _vandermonde(x, degree) | |
Pitp = _vandermonde(xitp, degree) | |
poly_coeffs = rng.normal(0.0, 1.0, P.shape[1]) | |
y = P.dot(poly_coeffs) | |
yitp1 = Pitp.dot(poly_coeffs) | |
interp = self.build(x, y, degree=degree) | |
ce_real = interp._chunk_evaluator | |
def _chunk_evaluator(*args, **kwargs): | |
kwargs.update(memory_budget=100) | |
return ce_real(*args, **kwargs) | |
monkeypatch.setattr(interp, '_chunk_evaluator', _chunk_evaluator) | |
yitp2 = interp(xitp) | |
xp_assert_close(yitp1, yitp2, atol=1e-8) | |
def test_vector_data(self): | |
# Make sure interpolating a vector field is the same as interpolating | |
# each component separately. | |
seq = Halton(2, scramble=False, seed=np.random.RandomState()) | |
x = seq.random(100) | |
xitp = seq.random(100) | |
y = np.array([_2d_test_function(x), | |
_2d_test_function(x[:, ::-1])]).T | |
yitp1 = self.build(x, y)(xitp) | |
yitp2 = self.build(x, y[:, 0])(xitp) | |
yitp3 = self.build(x, y[:, 1])(xitp) | |
xp_assert_close(yitp1[:, 0], yitp2) | |
xp_assert_close(yitp1[:, 1], yitp3) | |
def test_complex_data(self): | |
# Interpolating complex input should be the same as interpolating the | |
# real and complex components. | |
seq = Halton(2, scramble=False, seed=np.random.RandomState()) | |
x = seq.random(100) | |
xitp = seq.random(100) | |
y = _2d_test_function(x) + 1j*_2d_test_function(x[:, ::-1]) | |
yitp1 = self.build(x, y)(xitp) | |
yitp2 = self.build(x, y.real)(xitp) | |
yitp3 = self.build(x, y.imag)(xitp) | |
xp_assert_close(yitp1.real, yitp2) | |
xp_assert_close(yitp1.imag, yitp3) | |
def test_interpolation_misfit_1d(self, kernel): | |
# Make sure that each kernel, with its default `degree` and an | |
# appropriate `epsilon`, does a good job at interpolation in 1d. | |
seq = Halton(1, scramble=False, seed=np.random.RandomState()) | |
x = 3*seq.random(50) | |
xitp = 3*seq.random(50) | |
y = _1d_test_function(x) | |
ytrue = _1d_test_function(xitp) | |
yitp = self.build(x, y, epsilon=5.0, kernel=kernel)(xitp) | |
mse = np.mean((yitp - ytrue)**2) | |
assert mse < 1.0e-4 | |
def test_interpolation_misfit_2d(self, kernel): | |
# Make sure that each kernel, with its default `degree` and an | |
# appropriate `epsilon`, does a good job at interpolation in 2d. | |
seq = Halton(2, scramble=False, seed=np.random.RandomState()) | |
x = seq.random(100) | |
xitp = seq.random(100) | |
y = _2d_test_function(x) | |
ytrue = _2d_test_function(xitp) | |
yitp = self.build(x, y, epsilon=5.0, kernel=kernel)(xitp) | |
mse = np.mean((yitp - ytrue)**2) | |
assert mse < 2.0e-4 | |
def test_smoothing_misfit(self, kernel): | |
# Make sure we can find a smoothing parameter for each kernel that | |
# removes a sufficient amount of noise. | |
rng = np.random.RandomState(0) | |
seq = Halton(1, scramble=False, seed=rng) | |
noise = 0.2 | |
rmse_tol = 0.1 | |
smoothing_range = 10**np.linspace(-4, 1, 20) | |
x = 3*seq.random(100) | |
y = _1d_test_function(x) + rng.normal(0.0, noise, (100,)) | |
ytrue = _1d_test_function(x) | |
rmse_within_tol = False | |
for smoothing in smoothing_range: | |
ysmooth = self.build( | |
x, y, | |
epsilon=1.0, | |
smoothing=smoothing, | |
kernel=kernel)(x) | |
rmse = np.sqrt(np.mean((ysmooth - ytrue)**2)) | |
if rmse < rmse_tol: | |
rmse_within_tol = True | |
break | |
assert rmse_within_tol | |
def test_array_smoothing(self): | |
# Test using an array for `smoothing` to give less weight to a known | |
# outlier. | |
rng = np.random.RandomState(0) | |
seq = Halton(1, scramble=False, seed=rng) | |
degree = 2 | |
x = seq.random(50) | |
P = _vandermonde(x, degree) | |
poly_coeffs = rng.normal(0.0, 1.0, P.shape[1]) | |
y = P.dot(poly_coeffs) | |
y_with_outlier = np.copy(y) | |
y_with_outlier[10] += 1.0 | |
smoothing = np.zeros((50,)) | |
smoothing[10] = 1000.0 | |
yitp = self.build(x, y_with_outlier, smoothing=smoothing)(x) | |
# Should be able to reproduce the uncorrupted data almost exactly. | |
xp_assert_close(yitp, y, atol=1e-4) | |
def test_inconsistent_x_dimensions_error(self): | |
# ValueError should be raised if the observation points and evaluation | |
# points have a different number of dimensions. | |
y = Halton(2, scramble=False, seed=np.random.RandomState()).random(10) | |
d = _2d_test_function(y) | |
x = Halton(1, scramble=False, seed=np.random.RandomState()).random(10) | |
match = 'Expected the second axis of `x`' | |
with pytest.raises(ValueError, match=match): | |
self.build(y, d)(x) | |
def test_inconsistent_d_length_error(self): | |
y = np.linspace(0, 1, 5)[:, None] | |
d = np.zeros(1) | |
match = 'Expected the first axis of `d`' | |
with pytest.raises(ValueError, match=match): | |
self.build(y, d) | |
def test_y_not_2d_error(self): | |
y = np.linspace(0, 1, 5) | |
d = np.zeros(5) | |
match = '`y` must be a 2-dimensional array.' | |
with pytest.raises(ValueError, match=match): | |
self.build(y, d) | |
def test_inconsistent_smoothing_length_error(self): | |
y = np.linspace(0, 1, 5)[:, None] | |
d = np.zeros(5) | |
smoothing = np.ones(1) | |
match = 'Expected `smoothing` to be' | |
with pytest.raises(ValueError, match=match): | |
self.build(y, d, smoothing=smoothing) | |
def test_invalid_kernel_name_error(self): | |
y = np.linspace(0, 1, 5)[:, None] | |
d = np.zeros(5) | |
match = '`kernel` must be one of' | |
with pytest.raises(ValueError, match=match): | |
self.build(y, d, kernel='test') | |
def test_epsilon_not_specified_error(self): | |
y = np.linspace(0, 1, 5)[:, None] | |
d = np.zeros(5) | |
for kernel in _AVAILABLE: | |
if kernel in _SCALE_INVARIANT: | |
continue | |
match = '`epsilon` must be specified' | |
with pytest.raises(ValueError, match=match): | |
self.build(y, d, kernel=kernel) | |
def test_x_not_2d_error(self): | |
y = np.linspace(0, 1, 5)[:, None] | |
x = np.linspace(0, 1, 5) | |
d = np.zeros(5) | |
match = '`x` must be a 2-dimensional array.' | |
with pytest.raises(ValueError, match=match): | |
self.build(y, d)(x) | |
def test_not_enough_observations_error(self): | |
y = np.linspace(0, 1, 1)[:, None] | |
d = np.zeros(1) | |
match = 'At least 2 data points are required' | |
with pytest.raises(ValueError, match=match): | |
self.build(y, d, kernel='thin_plate_spline') | |
def test_degree_warning(self): | |
y = np.linspace(0, 1, 5)[:, None] | |
d = np.zeros(5) | |
for kernel, deg in _NAME_TO_MIN_DEGREE.items(): | |
# Only test for kernels that its minimum degree is not 0. | |
if deg >= 1: | |
match = f'`degree` should not be below {deg}' | |
with pytest.warns(Warning, match=match): | |
self.build(y, d, epsilon=1.0, kernel=kernel, degree=deg-1) | |
def test_minus_one_degree(self): | |
# Make sure a degree of -1 is accepted without any warning. | |
y = np.linspace(0, 1, 5)[:, None] | |
d = np.zeros(5) | |
for kernel, _ in _NAME_TO_MIN_DEGREE.items(): | |
self.build(y, d, epsilon=1.0, kernel=kernel, degree=-1) | |
def test_rank_error(self): | |
# An error should be raised when `kernel` is "thin_plate_spline" and | |
# observations are 2-D and collinear. | |
y = np.array([[2.0, 0.0], [1.0, 0.0], [0.0, 0.0]]) | |
d = np.array([0.0, 0.0, 0.0]) | |
match = 'does not have full column rank' | |
with pytest.raises(LinAlgError, match=match): | |
self.build(y, d, kernel='thin_plate_spline')(y) | |
def test_single_point(self): | |
# Make sure interpolation still works with only one point (in 1, 2, and | |
# 3 dimensions). | |
for dim in [1, 2, 3]: | |
y = np.zeros((1, dim)) | |
d = np.ones((1,)) | |
f = self.build(y, d, kernel='linear')(y) | |
xp_assert_close(d, f) | |
def test_pickleable(self): | |
# Make sure we can pickle and unpickle the interpolant without any | |
# changes in the behavior. | |
seq = Halton(1, scramble=False, seed=np.random.RandomState(2305982309)) | |
x = 3*seq.random(50) | |
xitp = 3*seq.random(50) | |
y = _1d_test_function(x) | |
interp = self.build(x, y) | |
yitp1 = interp(xitp) | |
yitp2 = pickle.loads(pickle.dumps(interp))(xitp) | |
xp_assert_close(yitp1, yitp2, atol=1e-16) | |
class TestRBFInterpolatorNeighborsNone(_TestRBFInterpolator): | |
def build(self, *args, **kwargs): | |
return RBFInterpolator(*args, **kwargs) | |
def test_smoothing_limit_1d(self): | |
# For large smoothing parameters, the interpolant should approach a | |
# least squares fit of a polynomial with the specified degree. | |
seq = Halton(1, scramble=False, seed=np.random.RandomState()) | |
degree = 3 | |
smoothing = 1e8 | |
x = 3*seq.random(50) | |
xitp = 3*seq.random(50) | |
y = _1d_test_function(x) | |
yitp1 = self.build( | |
x, y, | |
degree=degree, | |
smoothing=smoothing | |
)(xitp) | |
P = _vandermonde(x, degree) | |
Pitp = _vandermonde(xitp, degree) | |
yitp2 = Pitp.dot(np.linalg.lstsq(P, y, rcond=None)[0]) | |
xp_assert_close(yitp1, yitp2, atol=1e-8) | |
def test_smoothing_limit_2d(self): | |
# For large smoothing parameters, the interpolant should approach a | |
# least squares fit of a polynomial with the specified degree. | |
seq = Halton(2, scramble=False, seed=np.random.RandomState()) | |
degree = 3 | |
smoothing = 1e8 | |
x = seq.random(100) | |
xitp = seq.random(100) | |
y = _2d_test_function(x) | |
yitp1 = self.build( | |
x, y, | |
degree=degree, | |
smoothing=smoothing | |
)(xitp) | |
P = _vandermonde(x, degree) | |
Pitp = _vandermonde(xitp, degree) | |
yitp2 = Pitp.dot(np.linalg.lstsq(P, y, rcond=None)[0]) | |
xp_assert_close(yitp1, yitp2, atol=1e-8) | |
class TestRBFInterpolatorNeighbors20(_TestRBFInterpolator): | |
# RBFInterpolator using 20 nearest neighbors. | |
def build(self, *args, **kwargs): | |
return RBFInterpolator(*args, **kwargs, neighbors=20) | |
def test_equivalent_to_rbf_interpolator(self): | |
seq = Halton(2, scramble=False, seed=np.random.RandomState()) | |
x = seq.random(100) | |
xitp = seq.random(100) | |
y = _2d_test_function(x) | |
yitp1 = self.build(x, y)(xitp) | |
yitp2 = [] | |
tree = cKDTree(x) | |
for xi in xitp: | |
_, nbr = tree.query(xi, 20) | |
yitp2.append(RBFInterpolator(x[nbr], y[nbr])(xi[None])[0]) | |
xp_assert_close(yitp1, yitp2, atol=1e-8) | |
def test_concurrency(self): | |
# Check that no segfaults appear with concurrent access to | |
# RbfInterpolator | |
seq = Halton(2, scramble=False, seed=np.random.RandomState(0)) | |
x = seq.random(100) | |
xitp = seq.random(100) | |
y = _2d_test_function(x) | |
interp = self.build(x, y) | |
def worker_fn(_, interp, xp): | |
interp(xp) | |
_run_concurrent_barrier(10, worker_fn, interp, xitp) | |
class TestRBFInterpolatorNeighborsInf(TestRBFInterpolatorNeighborsNone): | |
# RBFInterpolator using neighbors=np.inf. This should give exactly the same | |
# results as neighbors=None, but it will be slower. | |
def build(self, *args, **kwargs): | |
return RBFInterpolator(*args, **kwargs, neighbors=np.inf) | |
def test_equivalent_to_rbf_interpolator(self): | |
seq = Halton(1, scramble=False, seed=np.random.RandomState()) | |
x = 3*seq.random(50) | |
xitp = 3*seq.random(50) | |
y = _1d_test_function(x) | |
yitp1 = self.build(x, y)(xitp) | |
yitp2 = RBFInterpolator(x, y)(xitp) | |
xp_assert_close(yitp1, yitp2, atol=1e-8) | |