Spaces:
Running
Running
# | |
# Module which deals with pickling of objects. | |
# | |
# multiprocessing/reduction.py | |
# | |
# Copyright (c) 2006-2008, R Oudkerk | |
# Licensed to PSF under a Contributor Agreement. | |
# | |
from abc import ABCMeta | |
import copyreg | |
import functools | |
import io | |
import os | |
import pickle | |
import socket | |
import sys | |
from . import context | |
__all__ = ['send_handle', 'recv_handle', 'ForkingPickler', 'register', 'dump'] | |
HAVE_SEND_HANDLE = (sys.platform == 'win32' or | |
(hasattr(socket, 'CMSG_LEN') and | |
hasattr(socket, 'SCM_RIGHTS') and | |
hasattr(socket.socket, 'sendmsg'))) | |
# | |
# Pickler subclass | |
# | |
class ForkingPickler(pickle.Pickler): | |
'''Pickler subclass used by multiprocessing.''' | |
_extra_reducers = {} | |
_copyreg_dispatch_table = copyreg.dispatch_table | |
def __init__(self, *args): | |
super().__init__(*args) | |
self.dispatch_table = self._copyreg_dispatch_table.copy() | |
self.dispatch_table.update(self._extra_reducers) | |
def register(cls, type, reduce): | |
'''Register a reduce function for a type.''' | |
cls._extra_reducers[type] = reduce | |
def dumps(cls, obj, protocol=None): | |
buf = io.BytesIO() | |
cls(buf, protocol).dump(obj) | |
return buf.getbuffer() | |
loads = pickle.loads | |
register = ForkingPickler.register | |
def dump(obj, file, protocol=None): | |
'''Replacement for pickle.dump() using ForkingPickler.''' | |
ForkingPickler(file, protocol).dump(obj) | |
# | |
# Platform specific definitions | |
# | |
if sys.platform == 'win32': | |
# Windows | |
__all__ += ['DupHandle', 'duplicate', 'steal_handle'] | |
import _winapi | |
def duplicate(handle, target_process=None, inheritable=False, | |
*, source_process=None): | |
'''Duplicate a handle. (target_process is a handle not a pid!)''' | |
current_process = _winapi.GetCurrentProcess() | |
if source_process is None: | |
source_process = current_process | |
if target_process is None: | |
target_process = current_process | |
return _winapi.DuplicateHandle( | |
source_process, handle, target_process, | |
0, inheritable, _winapi.DUPLICATE_SAME_ACCESS) | |
def steal_handle(source_pid, handle): | |
'''Steal a handle from process identified by source_pid.''' | |
source_process_handle = _winapi.OpenProcess( | |
_winapi.PROCESS_DUP_HANDLE, False, source_pid) | |
try: | |
return _winapi.DuplicateHandle( | |
source_process_handle, handle, | |
_winapi.GetCurrentProcess(), 0, False, | |
_winapi.DUPLICATE_SAME_ACCESS | _winapi.DUPLICATE_CLOSE_SOURCE) | |
finally: | |
_winapi.CloseHandle(source_process_handle) | |
def send_handle(conn, handle, destination_pid): | |
'''Send a handle over a local connection.''' | |
dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid) | |
conn.send(dh) | |
def recv_handle(conn): | |
'''Receive a handle over a local connection.''' | |
return conn.recv().detach() | |
class DupHandle(object): | |
'''Picklable wrapper for a handle.''' | |
def __init__(self, handle, access, pid=None): | |
if pid is None: | |
# We just duplicate the handle in the current process and | |
# let the receiving process steal the handle. | |
pid = os.getpid() | |
proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid) | |
try: | |
self._handle = _winapi.DuplicateHandle( | |
_winapi.GetCurrentProcess(), | |
handle, proc, access, False, 0) | |
finally: | |
_winapi.CloseHandle(proc) | |
self._access = access | |
self._pid = pid | |
def detach(self): | |
'''Get the handle. This should only be called once.''' | |
# retrieve handle from process which currently owns it | |
if self._pid == os.getpid(): | |
# The handle has already been duplicated for this process. | |
return self._handle | |
# We must steal the handle from the process whose pid is self._pid. | |
proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, | |
self._pid) | |
try: | |
return _winapi.DuplicateHandle( | |
proc, self._handle, _winapi.GetCurrentProcess(), | |
self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE) | |
finally: | |
_winapi.CloseHandle(proc) | |
else: | |
# Unix | |
__all__ += ['DupFd', 'sendfds', 'recvfds'] | |
import array | |
# On MacOSX we should acknowledge receipt of fds -- see Issue14669 | |
ACKNOWLEDGE = sys.platform == 'darwin' | |
def sendfds(sock, fds): | |
'''Send an array of fds over an AF_UNIX socket.''' | |
fds = array.array('i', fds) | |
msg = bytes([len(fds) % 256]) | |
sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)]) | |
if ACKNOWLEDGE and sock.recv(1) != b'A': | |
raise RuntimeError('did not receive acknowledgement of fd') | |
def recvfds(sock, size): | |
'''Receive an array of fds over an AF_UNIX socket.''' | |
a = array.array('i') | |
bytes_size = a.itemsize * size | |
msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_SPACE(bytes_size)) | |
if not msg and not ancdata: | |
raise EOFError | |
try: | |
if ACKNOWLEDGE: | |
sock.send(b'A') | |
if len(ancdata) != 1: | |
raise RuntimeError('received %d items of ancdata' % | |
len(ancdata)) | |
cmsg_level, cmsg_type, cmsg_data = ancdata[0] | |
if (cmsg_level == socket.SOL_SOCKET and | |
cmsg_type == socket.SCM_RIGHTS): | |
if len(cmsg_data) % a.itemsize != 0: | |
raise ValueError | |
a.frombytes(cmsg_data) | |
if len(a) % 256 != msg[0]: | |
raise AssertionError( | |
"Len is {0:n} but msg[0] is {1!r}".format( | |
len(a), msg[0])) | |
return list(a) | |
except (ValueError, IndexError): | |
pass | |
raise RuntimeError('Invalid data received') | |
def send_handle(conn, handle, destination_pid): | |
'''Send a handle over a local connection.''' | |
with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: | |
sendfds(s, [handle]) | |
def recv_handle(conn): | |
'''Receive a handle over a local connection.''' | |
with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: | |
return recvfds(s, 1)[0] | |
def DupFd(fd): | |
'''Return a wrapper for an fd.''' | |
popen_obj = context.get_spawning_popen() | |
if popen_obj is not None: | |
return popen_obj.DupFd(popen_obj.duplicate_for_child(fd)) | |
elif HAVE_SEND_HANDLE: | |
from . import resource_sharer | |
return resource_sharer.DupFd(fd) | |
else: | |
raise ValueError('SCM_RIGHTS appears not to be available') | |
# | |
# Try making some callable types picklable | |
# | |
def _reduce_method(m): | |
if m.__self__ is None: | |
return getattr, (m.__class__, m.__func__.__name__) | |
else: | |
return getattr, (m.__self__, m.__func__.__name__) | |
class _C: | |
def f(self): | |
pass | |
register(type(_C().f), _reduce_method) | |
def _reduce_method_descriptor(m): | |
return getattr, (m.__objclass__, m.__name__) | |
register(type(list.append), _reduce_method_descriptor) | |
register(type(int.__add__), _reduce_method_descriptor) | |
def _reduce_partial(p): | |
return _rebuild_partial, (p.func, p.args, p.keywords or {}) | |
def _rebuild_partial(func, args, keywords): | |
return functools.partial(func, *args, **keywords) | |
register(functools.partial, _reduce_partial) | |
# | |
# Make sockets picklable | |
# | |
if sys.platform == 'win32': | |
def _reduce_socket(s): | |
from .resource_sharer import DupSocket | |
return _rebuild_socket, (DupSocket(s),) | |
def _rebuild_socket(ds): | |
return ds.detach() | |
register(socket.socket, _reduce_socket) | |
else: | |
def _reduce_socket(s): | |
df = DupFd(s.fileno()) | |
return _rebuild_socket, (df, s.family, s.type, s.proto) | |
def _rebuild_socket(df, family, type, proto): | |
fd = df.detach() | |
return socket.socket(family, type, proto, fileno=fd) | |
register(socket.socket, _reduce_socket) | |
class AbstractReducer(metaclass=ABCMeta): | |
'''Abstract base class for use in implementing a Reduction class | |
suitable for use in replacing the standard reduction mechanism | |
used in multiprocessing.''' | |
ForkingPickler = ForkingPickler | |
register = register | |
dump = dump | |
send_handle = send_handle | |
recv_handle = recv_handle | |
if sys.platform == 'win32': | |
steal_handle = steal_handle | |
duplicate = duplicate | |
DupHandle = DupHandle | |
else: | |
sendfds = sendfds | |
recvfds = recvfds | |
DupFd = DupFd | |
_reduce_method = _reduce_method | |
_reduce_method_descriptor = _reduce_method_descriptor | |
_rebuild_partial = _rebuild_partial | |
_reduce_socket = _reduce_socket | |
_rebuild_socket = _rebuild_socket | |
def __init__(self, *args): | |
register(type(_C().f), _reduce_method) | |
register(type(list.append), _reduce_method_descriptor) | |
register(type(int.__add__), _reduce_method_descriptor) | |
register(functools.partial, _reduce_partial) | |
register(socket.socket, _reduce_socket) | |