File size: 5,255 Bytes
7885a28 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
import os
from joblib._parallel_backends import (
LokyBackend,
MultiprocessingBackend,
ThreadingBackend,
)
from joblib.parallel import (
BACKENDS,
DEFAULT_BACKEND,
EXTERNAL_BACKENDS,
Parallel,
delayed,
parallel_backend,
parallel_config,
)
from joblib.test.common import np, with_multiprocessing, with_numpy
from joblib.test.test_parallel import check_memmap
from joblib.testing import parametrize, raises
@parametrize("context", [parallel_config, parallel_backend])
def test_global_parallel_backend(context):
default = Parallel()._backend
pb = context("threading")
try:
assert isinstance(Parallel()._backend, ThreadingBackend)
finally:
pb.unregister()
assert type(Parallel()._backend) is type(default)
@parametrize("context", [parallel_config, parallel_backend])
def test_external_backends(context):
def register_foo():
BACKENDS["foo"] = ThreadingBackend
EXTERNAL_BACKENDS["foo"] = register_foo
try:
with context("foo"):
assert isinstance(Parallel()._backend, ThreadingBackend)
finally:
del EXTERNAL_BACKENDS["foo"]
@with_numpy
@with_multiprocessing
def test_parallel_config_no_backend(tmpdir):
# Check that parallel_config allows to change the config
# even if no backend is set.
with parallel_config(n_jobs=2, max_nbytes=1, temp_folder=tmpdir):
with Parallel(prefer="processes") as p:
assert isinstance(p._backend, LokyBackend)
assert p.n_jobs == 2
# Checks that memmapping is enabled
p(delayed(check_memmap)(a) for a in [np.random.random(10)] * 2)
assert len(os.listdir(tmpdir)) > 0
@with_numpy
@with_multiprocessing
def test_parallel_config_params_explicit_set(tmpdir):
with parallel_config(n_jobs=3, max_nbytes=1, temp_folder=tmpdir):
with Parallel(n_jobs=2, prefer="processes", max_nbytes="1M") as p:
assert isinstance(p._backend, LokyBackend)
assert p.n_jobs == 2
# Checks that memmapping is disabled
with raises(TypeError, match="Expected np.memmap instance"):
p(delayed(check_memmap)(a) for a in [np.random.random(10)] * 2)
@parametrize("param", ["prefer", "require"])
def test_parallel_config_bad_params(param):
# Check that an error is raised when setting a wrong backend
# hint or constraint
with raises(ValueError, match=f"{param}=wrong is not a valid"):
with parallel_config(**{param: "wrong"}):
Parallel()
def test_parallel_config_constructor_params():
# Check that an error is raised when backend is None
# but backend constructor params are given
with raises(ValueError, match="only supported when backend is not None"):
with parallel_config(inner_max_num_threads=1):
pass
with raises(ValueError, match="only supported when backend is not None"):
with parallel_config(backend_param=1):
pass
with raises(ValueError, match="only supported when backend is a string"):
with parallel_config(backend=BACKENDS[DEFAULT_BACKEND], backend_param=1):
pass
def test_parallel_config_nested():
# Check that nested configuration retrieves the info from the
# parent config and do not reset them.
with parallel_config(n_jobs=2):
p = Parallel()
assert isinstance(p._backend, BACKENDS[DEFAULT_BACKEND])
assert p.n_jobs == 2
with parallel_config(backend="threading"):
with parallel_config(n_jobs=2):
p = Parallel()
assert isinstance(p._backend, ThreadingBackend)
assert p.n_jobs == 2
with parallel_config(verbose=100):
with parallel_config(n_jobs=2):
p = Parallel()
assert p.verbose == 100
assert p.n_jobs == 2
@with_numpy
@with_multiprocessing
@parametrize(
"backend",
["multiprocessing", "threading", MultiprocessingBackend(), ThreadingBackend()],
)
@parametrize("context", [parallel_config, parallel_backend])
def test_threadpool_limitation_in_child_context_error(context, backend):
with raises(AssertionError, match=r"does not acc.*inner_max_num_threads"):
context(backend, inner_max_num_threads=1)
@parametrize("context", [parallel_config, parallel_backend])
def test_parallel_n_jobs_none(context):
# Check that n_jobs=None is interpreted as "unset" in Parallel
# non regression test for #1473
with context(backend="threading", n_jobs=2):
with Parallel(n_jobs=None) as p:
assert p.n_jobs == 2
with context(backend="threading"):
default_n_jobs = Parallel().n_jobs
with Parallel(n_jobs=None) as p:
assert p.n_jobs == default_n_jobs
@parametrize("context", [parallel_config, parallel_backend])
def test_parallel_config_n_jobs_none(context):
# Check that n_jobs=None is interpreted as "explicitly set" in
# parallel_(config/backend)
# non regression test for #1473
with context(backend="threading", n_jobs=2):
with context(backend="threading", n_jobs=None):
# n_jobs=None resets n_jobs to backend's default
with Parallel() as p:
assert p.n_jobs == 1
|