Spaces:
Running
on
Zero
Running
on
Zero
"""Custom implementation of multiprocessing.Pool with custom pickler. | |
This module provides efficient ways of working with data stored in | |
shared memory with numpy.memmap arrays without inducing any memory | |
copy between the parent and child processes. | |
This module should not be imported if multiprocessing is not | |
available as it implements subclasses of multiprocessing Pool | |
that uses a custom alternative to SimpleQueue. | |
""" | |
# Author: Olivier Grisel <[email protected]> | |
# Copyright: 2012, Olivier Grisel | |
# License: BSD 3 clause | |
import copyreg | |
import sys | |
import warnings | |
from time import sleep | |
try: | |
WindowsError | |
except NameError: | |
WindowsError = type(None) | |
from pickle import Pickler | |
from pickle import HIGHEST_PROTOCOL | |
from io import BytesIO | |
from ._memmapping_reducer import get_memmapping_reducers | |
from ._memmapping_reducer import TemporaryResourcesManager | |
from ._multiprocessing_helpers import mp, assert_spawning | |
# We need the class definition to derive from it, not the multiprocessing.Pool | |
# factory function | |
from multiprocessing.pool import Pool | |
try: | |
import numpy as np | |
except ImportError: | |
np = None | |
############################################################################### | |
# Enable custom pickling in Pool queues | |
class CustomizablePickler(Pickler): | |
"""Pickler that accepts custom reducers. | |
TODO python2_drop : can this be simplified ? | |
HIGHEST_PROTOCOL is selected by default as this pickler is used | |
to pickle ephemeral datastructures for interprocess communication | |
hence no backward compatibility is required. | |
`reducers` is expected to be a dictionary with key/values | |
being `(type, callable)` pairs where `callable` is a function that | |
give an instance of `type` will return a tuple `(constructor, | |
tuple_of_objects)` to rebuild an instance out of the pickled | |
`tuple_of_objects` as would return a `__reduce__` method. See the | |
standard library documentation on pickling for more details. | |
""" | |
# We override the pure Python pickler as its the only way to be able to | |
# customize the dispatch table without side effects in Python 2.7 | |
# to 3.2. For Python 3.3+ leverage the new dispatch_table | |
# feature from https://bugs.python.org/issue14166 that makes it possible | |
# to use the C implementation of the Pickler which is faster. | |
def __init__(self, writer, reducers=None, protocol=HIGHEST_PROTOCOL): | |
Pickler.__init__(self, writer, protocol=protocol) | |
if reducers is None: | |
reducers = {} | |
if hasattr(Pickler, 'dispatch'): | |
# Make the dispatch registry an instance level attribute instead of | |
# a reference to the class dictionary under Python 2 | |
self.dispatch = Pickler.dispatch.copy() | |
else: | |
# Under Python 3 initialize the dispatch table with a copy of the | |
# default registry | |
self.dispatch_table = copyreg.dispatch_table.copy() | |
for type, reduce_func in reducers.items(): | |
self.register(type, reduce_func) | |
def register(self, type, reduce_func): | |
"""Attach a reducer function to a given type in the dispatch table.""" | |
if hasattr(Pickler, 'dispatch'): | |
# Python 2 pickler dispatching is not explicitly customizable. | |
# Let us use a closure to workaround this limitation. | |
def dispatcher(self, obj): | |
reduced = reduce_func(obj) | |
self.save_reduce(obj=obj, *reduced) | |
self.dispatch[type] = dispatcher | |
else: | |
self.dispatch_table[type] = reduce_func | |
class CustomizablePicklingQueue(object): | |
"""Locked Pipe implementation that uses a customizable pickler. | |
This class is an alternative to the multiprocessing implementation | |
of SimpleQueue in order to make it possible to pass custom | |
pickling reducers, for instance to avoid memory copy when passing | |
memory mapped datastructures. | |
`reducers` is expected to be a dict with key / values being | |
`(type, callable)` pairs where `callable` is a function that, given an | |
instance of `type`, will return a tuple `(constructor, tuple_of_objects)` | |
to rebuild an instance out of the pickled `tuple_of_objects` as would | |
return a `__reduce__` method. | |
See the standard library documentation on pickling for more details. | |
""" | |
def __init__(self, context, reducers=None): | |
self._reducers = reducers | |
self._reader, self._writer = context.Pipe(duplex=False) | |
self._rlock = context.Lock() | |
if sys.platform == 'win32': | |
self._wlock = None | |
else: | |
self._wlock = context.Lock() | |
self._make_methods() | |
def __getstate__(self): | |
assert_spawning(self) | |
return (self._reader, self._writer, self._rlock, self._wlock, | |
self._reducers) | |
def __setstate__(self, state): | |
(self._reader, self._writer, self._rlock, self._wlock, | |
self._reducers) = state | |
self._make_methods() | |
def empty(self): | |
return not self._reader.poll() | |
def _make_methods(self): | |
self._recv = recv = self._reader.recv | |
racquire, rrelease = self._rlock.acquire, self._rlock.release | |
def get(): | |
racquire() | |
try: | |
return recv() | |
finally: | |
rrelease() | |
self.get = get | |
if self._reducers: | |
def send(obj): | |
buffer = BytesIO() | |
CustomizablePickler(buffer, self._reducers).dump(obj) | |
self._writer.send_bytes(buffer.getvalue()) | |
self._send = send | |
else: | |
self._send = send = self._writer.send | |
if self._wlock is None: | |
# writes to a message oriented win32 pipe are atomic | |
self.put = send | |
else: | |
wlock_acquire, wlock_release = ( | |
self._wlock.acquire, self._wlock.release) | |
def put(obj): | |
wlock_acquire() | |
try: | |
return send(obj) | |
finally: | |
wlock_release() | |
self.put = put | |
class PicklingPool(Pool): | |
"""Pool implementation with customizable pickling reducers. | |
This is useful to control how data is shipped between processes | |
and makes it possible to use shared memory without useless | |
copies induces by the default pickling methods of the original | |
objects passed as arguments to dispatch. | |
`forward_reducers` and `backward_reducers` are expected to be | |
dictionaries with key/values being `(type, callable)` pairs where | |
`callable` is a function that, given an instance of `type`, will return a | |
tuple `(constructor, tuple_of_objects)` to rebuild an instance out of the | |
pickled `tuple_of_objects` as would return a `__reduce__` method. | |
See the standard library documentation about pickling for more details. | |
""" | |
def __init__(self, processes=None, forward_reducers=None, | |
backward_reducers=None, **kwargs): | |
if forward_reducers is None: | |
forward_reducers = dict() | |
if backward_reducers is None: | |
backward_reducers = dict() | |
self._forward_reducers = forward_reducers | |
self._backward_reducers = backward_reducers | |
poolargs = dict(processes=processes) | |
poolargs.update(kwargs) | |
super(PicklingPool, self).__init__(**poolargs) | |
def _setup_queues(self): | |
context = getattr(self, '_ctx', mp) | |
self._inqueue = CustomizablePicklingQueue(context, | |
self._forward_reducers) | |
self._outqueue = CustomizablePicklingQueue(context, | |
self._backward_reducers) | |
self._quick_put = self._inqueue._send | |
self._quick_get = self._outqueue._recv | |
class MemmappingPool(PicklingPool): | |
"""Process pool that shares large arrays to avoid memory copy. | |
This drop-in replacement for `multiprocessing.pool.Pool` makes | |
it possible to work efficiently with shared memory in a numpy | |
context. | |
Existing instances of numpy.memmap are preserved: the child | |
suprocesses will have access to the same shared memory in the | |
original mode except for the 'w+' mode that is automatically | |
transformed as 'r+' to avoid zeroing the original data upon | |
instantiation. | |
Furthermore large arrays from the parent process are automatically | |
dumped to a temporary folder on the filesystem such as child | |
processes to access their content via memmapping (file system | |
backed shared memory). | |
Note: it is important to call the terminate method to collect | |
the temporary folder used by the pool. | |
Parameters | |
---------- | |
processes: int, optional | |
Number of worker processes running concurrently in the pool. | |
initializer: callable, optional | |
Callable executed on worker process creation. | |
initargs: tuple, optional | |
Arguments passed to the initializer callable. | |
temp_folder: (str, callable) optional | |
If str: | |
Folder to be used by the pool for memmapping large arrays | |
for sharing memory with worker processes. If None, this will try in | |
order: | |
- a folder pointed by the JOBLIB_TEMP_FOLDER environment variable, | |
- /dev/shm if the folder exists and is writable: this is a RAMdisk | |
filesystem available by default on modern Linux distributions, | |
- the default system temporary folder that can be overridden | |
with TMP, TMPDIR or TEMP environment variables, typically /tmp | |
under Unix operating systems. | |
if callable: | |
An callable in charge of dynamically resolving a temporary folder | |
for memmapping large arrays. | |
max_nbytes int or None, optional, 1e6 by default | |
Threshold on the size of arrays passed to the workers that | |
triggers automated memory mapping in temp_folder. | |
Use None to disable memmapping of large arrays. | |
mmap_mode: {'r+', 'r', 'w+', 'c'} | |
Memmapping mode for numpy arrays passed to workers. | |
See 'max_nbytes' parameter documentation for more details. | |
forward_reducers: dictionary, optional | |
Reducers used to pickle objects passed from main process to worker | |
processes: see below. | |
backward_reducers: dictionary, optional | |
Reducers used to pickle return values from workers back to the | |
main process. | |
verbose: int, optional | |
Make it possible to monitor how the communication of numpy arrays | |
with the subprocess is handled (pickling or memmapping) | |
prewarm: bool or str, optional, "auto" by default. | |
If True, force a read on newly memmapped array to make sure that OS | |
pre-cache it in memory. This can be useful to avoid concurrent disk | |
access when the same data array is passed to different worker | |
processes. If "auto" (by default), prewarm is set to True, unless the | |
Linux shared memory partition /dev/shm is available and used as temp | |
folder. | |
`forward_reducers` and `backward_reducers` are expected to be | |
dictionaries with key/values being `(type, callable)` pairs where | |
`callable` is a function that give an instance of `type` will return | |
a tuple `(constructor, tuple_of_objects)` to rebuild an instance out | |
of the pickled `tuple_of_objects` as would return a `__reduce__` | |
method. See the standard library documentation on pickling for more | |
details. | |
""" | |
def __init__(self, processes=None, temp_folder=None, max_nbytes=1e6, | |
mmap_mode='r', forward_reducers=None, backward_reducers=None, | |
verbose=0, context_id=None, prewarm=False, **kwargs): | |
if context_id is not None: | |
warnings.warn('context_id is deprecated and ignored in joblib' | |
' 0.9.4 and will be removed in 0.11', | |
DeprecationWarning) | |
manager = TemporaryResourcesManager(temp_folder) | |
self._temp_folder_manager = manager | |
# The usage of a temp_folder_resolver over a simple temp_folder is | |
# superfluous for multiprocessing pools, as they don't get reused, see | |
# get_memmapping_executor for more details. We still use it for code | |
# simplicity. | |
forward_reducers, backward_reducers = \ | |
get_memmapping_reducers( | |
temp_folder_resolver=manager.resolve_temp_folder_name, | |
max_nbytes=max_nbytes, mmap_mode=mmap_mode, | |
forward_reducers=forward_reducers, | |
backward_reducers=backward_reducers, verbose=verbose, | |
unlink_on_gc_collect=False, prewarm=prewarm) | |
poolargs = dict( | |
processes=processes, | |
forward_reducers=forward_reducers, | |
backward_reducers=backward_reducers) | |
poolargs.update(kwargs) | |
super(MemmappingPool, self).__init__(**poolargs) | |
def terminate(self): | |
n_retries = 10 | |
for i in range(n_retries): | |
try: | |
super(MemmappingPool, self).terminate() | |
break | |
except OSError as e: | |
if isinstance(e, WindowsError): | |
# Workaround occasional "[Error 5] Access is denied" issue | |
# when trying to terminate a process under windows. | |
sleep(0.1) | |
if i + 1 == n_retries: | |
warnings.warn("Failed to terminate worker processes in" | |
" multiprocessing pool: %r" % e) | |
# Clean up the temporary resources as the workers should now be off. | |
self._temp_folder_manager._clean_temporary_resources() | |
def _temp_folder(self): | |
# Legacy property in tests. could be removed if we refactored the | |
# memmapping tests. SHOULD ONLY BE USED IN TESTS! | |
# We cache this property because it is called late in the tests - at | |
# this point, all context have been unregistered, and | |
# resolve_temp_folder_name raises an error. | |
if getattr(self, '_cached_temp_folder', None) is not None: | |
return self._cached_temp_folder | |
else: | |
self._cached_temp_folder = self._temp_folder_manager.resolve_temp_folder_name() # noqa | |
return self._cached_temp_folder | |