File size: 5,255 Bytes
7885a28
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import os

from joblib._parallel_backends import (
    LokyBackend,
    MultiprocessingBackend,
    ThreadingBackend,
)
from joblib.parallel import (
    BACKENDS,
    DEFAULT_BACKEND,
    EXTERNAL_BACKENDS,
    Parallel,
    delayed,
    parallel_backend,
    parallel_config,
)
from joblib.test.common import np, with_multiprocessing, with_numpy
from joblib.test.test_parallel import check_memmap
from joblib.testing import parametrize, raises


@parametrize("context", [parallel_config, parallel_backend])
def test_global_parallel_backend(context):
    default = Parallel()._backend

    pb = context("threading")
    try:
        assert isinstance(Parallel()._backend, ThreadingBackend)
    finally:
        pb.unregister()
    assert type(Parallel()._backend) is type(default)


@parametrize("context", [parallel_config, parallel_backend])
def test_external_backends(context):
    def register_foo():
        BACKENDS["foo"] = ThreadingBackend

    EXTERNAL_BACKENDS["foo"] = register_foo
    try:
        with context("foo"):
            assert isinstance(Parallel()._backend, ThreadingBackend)
    finally:
        del EXTERNAL_BACKENDS["foo"]


@with_numpy
@with_multiprocessing
def test_parallel_config_no_backend(tmpdir):
    # Check that parallel_config allows to change the config
    # even if no backend is set.
    with parallel_config(n_jobs=2, max_nbytes=1, temp_folder=tmpdir):
        with Parallel(prefer="processes") as p:
            assert isinstance(p._backend, LokyBackend)
            assert p.n_jobs == 2

            # Checks that memmapping is enabled
            p(delayed(check_memmap)(a) for a in [np.random.random(10)] * 2)
            assert len(os.listdir(tmpdir)) > 0


@with_numpy
@with_multiprocessing
def test_parallel_config_params_explicit_set(tmpdir):
    with parallel_config(n_jobs=3, max_nbytes=1, temp_folder=tmpdir):
        with Parallel(n_jobs=2, prefer="processes", max_nbytes="1M") as p:
            assert isinstance(p._backend, LokyBackend)
            assert p.n_jobs == 2

            # Checks that memmapping is disabled
            with raises(TypeError, match="Expected np.memmap instance"):
                p(delayed(check_memmap)(a) for a in [np.random.random(10)] * 2)


@parametrize("param", ["prefer", "require"])
def test_parallel_config_bad_params(param):
    # Check that an error is raised when setting a wrong backend
    # hint or constraint
    with raises(ValueError, match=f"{param}=wrong is not a valid"):
        with parallel_config(**{param: "wrong"}):
            Parallel()


def test_parallel_config_constructor_params():
    # Check that an error is raised when backend is None
    # but backend constructor params are given
    with raises(ValueError, match="only supported when backend is not None"):
        with parallel_config(inner_max_num_threads=1):
            pass

    with raises(ValueError, match="only supported when backend is not None"):
        with parallel_config(backend_param=1):
            pass

    with raises(ValueError, match="only supported when backend is a string"):
        with parallel_config(backend=BACKENDS[DEFAULT_BACKEND], backend_param=1):
            pass


def test_parallel_config_nested():
    # Check that nested configuration retrieves the info from the
    # parent config and do not reset them.

    with parallel_config(n_jobs=2):
        p = Parallel()
        assert isinstance(p._backend, BACKENDS[DEFAULT_BACKEND])
        assert p.n_jobs == 2

    with parallel_config(backend="threading"):
        with parallel_config(n_jobs=2):
            p = Parallel()
            assert isinstance(p._backend, ThreadingBackend)
            assert p.n_jobs == 2

    with parallel_config(verbose=100):
        with parallel_config(n_jobs=2):
            p = Parallel()
            assert p.verbose == 100
            assert p.n_jobs == 2


@with_numpy
@with_multiprocessing
@parametrize(
    "backend",
    ["multiprocessing", "threading", MultiprocessingBackend(), ThreadingBackend()],
)
@parametrize("context", [parallel_config, parallel_backend])
def test_threadpool_limitation_in_child_context_error(context, backend):
    with raises(AssertionError, match=r"does not acc.*inner_max_num_threads"):
        context(backend, inner_max_num_threads=1)


@parametrize("context", [parallel_config, parallel_backend])
def test_parallel_n_jobs_none(context):
    # Check that n_jobs=None is interpreted as "unset" in Parallel
    # non regression test for #1473
    with context(backend="threading", n_jobs=2):
        with Parallel(n_jobs=None) as p:
            assert p.n_jobs == 2

    with context(backend="threading"):
        default_n_jobs = Parallel().n_jobs
        with Parallel(n_jobs=None) as p:
            assert p.n_jobs == default_n_jobs


@parametrize("context", [parallel_config, parallel_backend])
def test_parallel_config_n_jobs_none(context):
    # Check that n_jobs=None is interpreted as "explicitly set" in
    # parallel_(config/backend)
    # non regression test for #1473
    with context(backend="threading", n_jobs=2):
        with context(backend="threading", n_jobs=None):
            # n_jobs=None resets n_jobs to backend's default
            with Parallel() as p:
                assert p.n_jobs == 1