File size: 2,567 Bytes
7885a28 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
import warnings
def _viztracer_init(init_kwargs):
"""Initialize viztracer's profiler in worker processes"""
from viztracer import VizTracer
tracer = VizTracer(**init_kwargs)
tracer.register_exit()
tracer.start()
def _make_viztracer_initializer_and_initargs():
try:
import viztracer
tracer = viztracer.get_tracer()
if tracer is not None and getattr(tracer, "enable", False):
# Profiler is active: introspect its configuration to
# initialize the workers with the same configuration.
return _viztracer_init, (tracer.init_kwargs,)
except ImportError:
# viztracer is not installed: nothing to do
pass
except Exception as e:
# In case viztracer's API evolve, we do not want to crash loky but
# we want to know about it to be able to update loky.
warnings.warn(f"Unable to introspect viztracer state: {e}")
return None, ()
class _ChainedInitializer:
"""Compound worker initializer
This is meant to be used in conjunction with _chain_initializers to
produce the necessary chained_args list to be passed to __call__.
"""
def __init__(self, initializers):
self._initializers = initializers
def __call__(self, *chained_args):
for initializer, args in zip(self._initializers, chained_args):
initializer(*args)
def _chain_initializers(initializer_and_args):
"""Convenience helper to combine a sequence of initializers.
If some initializers are None, they are filtered out.
"""
filtered_initializers = []
filtered_initargs = []
for initializer, initargs in initializer_and_args:
if initializer is not None:
filtered_initializers.append(initializer)
filtered_initargs.append(initargs)
if not filtered_initializers:
return None, ()
elif len(filtered_initializers) == 1:
return filtered_initializers[0], filtered_initargs[0]
else:
return _ChainedInitializer(filtered_initializers), filtered_initargs
def _prepare_initializer(initializer, initargs):
if initializer is not None and not callable(initializer):
raise TypeError(
f"initializer must be a callable, got: {initializer!r}"
)
# Introspect runtime to determine if we need to propagate the viztracer
# profiler information to the workers:
return _chain_initializers(
[
(initializer, initargs),
_make_viztracer_initializer_and_initargs(),
]
)
|