File size: 2,567 Bytes
7885a28
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import warnings


def _viztracer_init(init_kwargs):
    """Initialize viztracer's profiler in worker processes"""
    from viztracer import VizTracer

    tracer = VizTracer(**init_kwargs)
    tracer.register_exit()
    tracer.start()


def _make_viztracer_initializer_and_initargs():
    try:
        import viztracer

        tracer = viztracer.get_tracer()
        if tracer is not None and getattr(tracer, "enable", False):
            # Profiler is active: introspect its configuration to
            # initialize the workers with the same configuration.
            return _viztracer_init, (tracer.init_kwargs,)
    except ImportError:
        # viztracer is not installed: nothing to do
        pass
    except Exception as e:
        # In case viztracer's API evolve, we do not want to crash loky but
        # we want to know about it to be able to update loky.
        warnings.warn(f"Unable to introspect viztracer state: {e}")
    return None, ()


class _ChainedInitializer:
    """Compound worker initializer

    This is meant to be used in conjunction with _chain_initializers to
    produce  the necessary chained_args list to be passed to __call__.
    """

    def __init__(self, initializers):
        self._initializers = initializers

    def __call__(self, *chained_args):
        for initializer, args in zip(self._initializers, chained_args):
            initializer(*args)


def _chain_initializers(initializer_and_args):
    """Convenience helper to combine a sequence of initializers.

    If some initializers are None, they are filtered out.
    """
    filtered_initializers = []
    filtered_initargs = []
    for initializer, initargs in initializer_and_args:
        if initializer is not None:
            filtered_initializers.append(initializer)
            filtered_initargs.append(initargs)

    if not filtered_initializers:
        return None, ()
    elif len(filtered_initializers) == 1:
        return filtered_initializers[0], filtered_initargs[0]
    else:
        return _ChainedInitializer(filtered_initializers), filtered_initargs


def _prepare_initializer(initializer, initargs):
    if initializer is not None and not callable(initializer):
        raise TypeError(
            f"initializer must be a callable, got: {initializer!r}"
        )

    # Introspect runtime to determine if we need to propagate the viztracer
    # profiler information to the workers:
    return _chain_initializers(
        [
            (initializer, initargs),
            _make_viztracer_initializer_and_initargs(),
        ]
    )