File size: 9,314 Bytes
7885a28
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
"""
Test the func_inspect module.
"""

# Author: Gael Varoquaux <gael dot varoquaux at normalesup dot org>
# Copyright (c) 2009 Gael Varoquaux
# License: BSD Style, 3 clauses.

import functools

from joblib.func_inspect import (
    _clean_win_chars,
    filter_args,
    format_signature,
    get_func_code,
    get_func_name,
)
from joblib.memory import Memory
from joblib.test.common import with_numpy
from joblib.testing import fixture, parametrize, raises


###############################################################################
# Module-level functions and fixture, for tests
def f(x, y=0):
    pass


def g(x):
    pass


def h(x, y=0, *args, **kwargs):
    pass


def i(x=1):
    pass


def j(x, y, **kwargs):
    pass


def k(*args, **kwargs):
    pass


def m1(x, *, y):
    pass


def m2(x, *, y, z=3):
    pass


@fixture(scope="module")
def cached_func(tmpdir_factory):
    # Create a Memory object to test decorated functions.
    # We should be careful not to call the decorated functions, so that
    # cache directories are not created in the temp dir.
    cachedir = tmpdir_factory.mktemp("joblib_test_func_inspect")
    mem = Memory(cachedir.strpath)

    @mem.cache
    def cached_func_inner(x):
        return x

    return cached_func_inner


class Klass(object):
    def f(self, x):
        return x


###############################################################################
# Tests


@parametrize(
    "func,args,filtered_args",
    [
        (f, [[], (1,)], {"x": 1, "y": 0}),
        (f, [["x"], (1,)], {"y": 0}),
        (f, [["y"], (0,)], {"x": 0}),
        (f, [["y"], (0,), {"y": 1}], {"x": 0}),
        (f, [["x", "y"], (0,)], {}),
        (f, [[], (0,), {"y": 1}], {"x": 0, "y": 1}),
        (f, [["y"], (), {"x": 2, "y": 1}], {"x": 2}),
        (g, [[], (), {"x": 1}], {"x": 1}),
        (i, [[], (2,)], {"x": 2}),
    ],
)
def test_filter_args(func, args, filtered_args):
    assert filter_args(func, *args) == filtered_args


def test_filter_args_method():
    obj = Klass()
    assert filter_args(obj.f, [], (1,)) == {"x": 1, "self": obj}


@parametrize(
    "func,args,filtered_args",
    [
        (h, [[], (1,)], {"x": 1, "y": 0, "*": [], "**": {}}),
        (h, [[], (1, 2, 3, 4)], {"x": 1, "y": 2, "*": [3, 4], "**": {}}),
        (h, [[], (1, 25), {"ee": 2}], {"x": 1, "y": 25, "*": [], "**": {"ee": 2}}),
        (h, [["*"], (1, 2, 25), {"ee": 2}], {"x": 1, "y": 2, "**": {"ee": 2}}),
    ],
)
def test_filter_varargs(func, args, filtered_args):
    assert filter_args(func, *args) == filtered_args


test_filter_kwargs_extra_params = [
    (m1, [[], (1,), {"y": 2}], {"x": 1, "y": 2}),
    (m2, [[], (1,), {"y": 2}], {"x": 1, "y": 2, "z": 3}),
]


@parametrize(
    "func,args,filtered_args",
    [
        (k, [[], (1, 2), {"ee": 2}], {"*": [1, 2], "**": {"ee": 2}}),
        (k, [[], (3, 4)], {"*": [3, 4], "**": {}}),
    ]
    + test_filter_kwargs_extra_params,
)
def test_filter_kwargs(func, args, filtered_args):
    assert filter_args(func, *args) == filtered_args


def test_filter_args_2():
    assert filter_args(j, [], (1, 2), {"ee": 2}) == {"x": 1, "y": 2, "**": {"ee": 2}}

    ff = functools.partial(f, 1)
    # filter_args has to special-case partial
    assert filter_args(ff, [], (1,)) == {"*": [1], "**": {}}
    assert filter_args(ff, ["y"], (1,)) == {"*": [1], "**": {}}


@parametrize("func,funcname", [(f, "f"), (g, "g"), (cached_func, "cached_func")])
def test_func_name(func, funcname):
    # Check that we are not confused by decoration
    # here testcase 'cached_func' is the function itself
    assert get_func_name(func)[1] == funcname


def test_func_name_on_inner_func(cached_func):
    # Check that we are not confused by decoration
    # here testcase 'cached_func' is the 'cached_func_inner' function
    # returned by 'cached_func' fixture
    assert get_func_name(cached_func)[1] == "cached_func_inner"


def test_func_name_collision_on_inner_func():
    # Check that two functions defining and caching an inner function
    # with the same do not cause (module, name) collision
    def f():
        def inner_func():
            return  # pragma: no cover

        return get_func_name(inner_func)

    def g():
        def inner_func():
            return  # pragma: no cover

        return get_func_name(inner_func)

    module, name = f()
    other_module, other_name = g()

    assert name == other_name
    assert module != other_module


def test_func_inspect_errors():
    # Check that func_inspect is robust and will work on weird objects
    assert get_func_name("a".lower)[-1] == "lower"
    assert get_func_code("a".lower)[1:] == (None, -1)
    ff = lambda x: x  # noqa: E731
    assert get_func_name(ff, win_characters=False)[-1] == "<lambda>"
    assert get_func_code(ff)[1] == __file__.replace(".pyc", ".py")
    # Simulate a function defined in __main__
    ff.__module__ = "__main__"
    assert get_func_name(ff, win_characters=False)[-1] == "<lambda>"
    assert get_func_code(ff)[1] == __file__.replace(".pyc", ".py")


def func_with_kwonly_args(a, b, *, kw1="kw1", kw2="kw2"):
    pass


def func_with_signature(a: int, b: int) -> None:
    pass


def test_filter_args_edge_cases():
    assert filter_args(func_with_kwonly_args, [], (1, 2), {"kw1": 3, "kw2": 4}) == {
        "a": 1,
        "b": 2,
        "kw1": 3,
        "kw2": 4,
    }

    # filter_args doesn't care about keyword-only arguments so you
    # can pass 'kw1' into *args without any problem
    with raises(ValueError) as excinfo:
        filter_args(func_with_kwonly_args, [], (1, 2, 3), {"kw2": 2})
    excinfo.match("Keyword-only parameter 'kw1' was passed as positional parameter")

    assert filter_args(
        func_with_kwonly_args, ["b", "kw2"], (1, 2), {"kw1": 3, "kw2": 4}
    ) == {"a": 1, "kw1": 3}

    assert filter_args(func_with_signature, ["b"], (1, 2)) == {"a": 1}


def test_bound_methods():
    """Make sure that calling the same method on two different instances
    of the same class does resolv to different signatures.
    """
    a = Klass()
    b = Klass()
    assert filter_args(a.f, [], (1,)) != filter_args(b.f, [], (1,))


@parametrize(
    "exception,regex,func,args",
    [
        (
            ValueError,
            "ignore_lst must be a list of parameters to ignore",
            f,
            ["bar", (None,)],
        ),
        (
            ValueError,
            r"Ignore list: argument \'(.*)\' is not defined",
            g,
            [["bar"], (None,)],
        ),
        (ValueError, "Wrong number of arguments", h, [[]]),
    ],
)
def test_filter_args_error_msg(exception, regex, func, args):
    """Make sure that filter_args returns decent error messages, for the
    sake of the user.
    """
    with raises(exception) as excinfo:
        filter_args(func, *args)
    excinfo.match(regex)


def test_filter_args_no_kwargs_mutation():
    """None-regression test against 0.12.0 changes.

    https://github.com/joblib/joblib/pull/75

    Make sure filter args doesn't mutate the kwargs dict that gets passed in.
    """
    kwargs = {"x": 0}
    filter_args(g, [], [], kwargs)
    assert kwargs == {"x": 0}


def test_clean_win_chars():
    string = r"C:\foo\bar\main.py"
    mangled_string = _clean_win_chars(string)
    for char in ("\\", ":", "<", ">", "!"):
        assert char not in mangled_string


@parametrize(
    "func,args,kwargs,sgn_expected",
    [
        (g, [list(range(5))], {}, "g([0, 1, 2, 3, 4])"),
        (k, [1, 2, (3, 4)], {"y": True}, "k(1, 2, (3, 4), y=True)"),
    ],
)
def test_format_signature(func, args, kwargs, sgn_expected):
    # Test signature formatting.
    path, sgn_result = format_signature(func, *args, **kwargs)
    assert sgn_result == sgn_expected


def test_format_signature_long_arguments():
    shortening_threshold = 1500
    # shortening gets it down to 700 characters but there is the name
    # of the function in the signature and a few additional things
    # like dots for the ellipsis
    shortening_target = 700 + 10

    arg = "a" * shortening_threshold
    _, signature = format_signature(h, arg)
    assert len(signature) < shortening_target

    nb_args = 5
    args = [arg for _ in range(nb_args)]
    _, signature = format_signature(h, *args)
    assert len(signature) < shortening_target * nb_args

    kwargs = {str(i): arg for i, arg in enumerate(args)}
    _, signature = format_signature(h, **kwargs)
    assert len(signature) < shortening_target * nb_args

    _, signature = format_signature(h, *args, **kwargs)
    assert len(signature) < shortening_target * 2 * nb_args


@with_numpy
def test_format_signature_numpy():
    """Test the format signature formatting with numpy."""


def test_special_source_encoding():
    from joblib.test.test_func_inspect_special_encoding import big5_f

    func_code, source_file, first_line = get_func_code(big5_f)
    assert first_line == 5
    assert "def big5_f():" in func_code
    assert "test_func_inspect_special_encoding" in source_file


def _get_code():
    from joblib.test.test_func_inspect_special_encoding import big5_f

    return get_func_code(big5_f)[0]


def test_func_code_consistency():
    from joblib.parallel import Parallel, delayed

    codes = Parallel(n_jobs=2)(delayed(_get_code)() for _ in range(5))
    assert len(set(codes)) == 1