File size: 6,926 Bytes
7885a28 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
###############################################################################
# Customizable Pickler with some basic reducers
#
# author: Thomas Moreau
#
# adapted from multiprocessing/reduction.py (17/02/2017)
# * Replace the ForkingPickler with a similar _LokyPickler,
# * Add CustomizableLokyPickler to allow customizing pickling process
# on the fly.
#
import copyreg
import io
import functools
import types
import sys
import os
from multiprocessing import util
from pickle import loads, HIGHEST_PROTOCOL
###############################################################################
# Enable custom pickling in Loky.
_dispatch_table = {}
def register(type_, reduce_function):
_dispatch_table[type_] = reduce_function
###############################################################################
# Registers extra pickling routines to improve picklization for loky
# make methods picklable
def _reduce_method(m):
if m.__self__ is None:
return getattr, (m.__class__, m.__func__.__name__)
else:
return getattr, (m.__self__, m.__func__.__name__)
class _C:
def f(self):
pass
@classmethod
def h(cls):
pass
register(type(_C().f), _reduce_method)
register(type(_C.h), _reduce_method)
def _reduce_method_descriptor(m):
return getattr, (m.__objclass__, m.__name__)
register(type(list.append), _reduce_method_descriptor)
register(type(int.__add__), _reduce_method_descriptor)
# Make partial func pickable
def _reduce_partial(p):
return _rebuild_partial, (p.func, p.args, p.keywords or {})
def _rebuild_partial(func, args, keywords):
return functools.partial(func, *args, **keywords)
register(functools.partial, _reduce_partial)
if sys.platform != "win32":
from ._posix_reduction import _mk_inheritable # noqa: F401
else:
from . import _win_reduction # noqa: F401
# global variable to change the pickler behavior
try:
from joblib.externals import cloudpickle # noqa: F401
DEFAULT_ENV = "cloudpickle"
except ImportError:
# If cloudpickle is not present, fallback to pickle
DEFAULT_ENV = "pickle"
ENV_LOKY_PICKLER = os.environ.get("LOKY_PICKLER", DEFAULT_ENV)
_LokyPickler = None
_loky_pickler_name = None
def set_loky_pickler(loky_pickler=None):
global _LokyPickler, _loky_pickler_name
if loky_pickler is None:
loky_pickler = ENV_LOKY_PICKLER
loky_pickler_cls = None
# The default loky_pickler is cloudpickle
if loky_pickler in ["", None]:
loky_pickler = "cloudpickle"
if loky_pickler == _loky_pickler_name:
return
if loky_pickler == "cloudpickle":
from joblib.externals.cloudpickle import CloudPickler as loky_pickler_cls
else:
try:
from importlib import import_module
module_pickle = import_module(loky_pickler)
loky_pickler_cls = module_pickle.Pickler
except (ImportError, AttributeError) as e:
extra_info = (
"\nThis error occurred while setting loky_pickler to"
f" '{loky_pickler}', as required by the env variable "
"LOKY_PICKLER or the function set_loky_pickler."
)
e.args = (e.args[0] + extra_info,) + e.args[1:]
e.msg = e.args[0]
raise e
util.debug(
f"Using '{loky_pickler if loky_pickler else 'cloudpickle'}' for "
"serialization."
)
class CustomizablePickler(loky_pickler_cls):
_loky_pickler_cls = loky_pickler_cls
def _set_dispatch_table(self, dispatch_table):
for ancestor_class in self._loky_pickler_cls.mro():
dt_attribute = getattr(ancestor_class, "dispatch_table", None)
if isinstance(dt_attribute, types.MemberDescriptorType):
# Ancestor class (typically _pickle.Pickler) has a
# member_descriptor for its "dispatch_table" attribute. Use
# it to set the dispatch_table as a member instead of a
# dynamic attribute in the __dict__ of the instance,
# otherwise it will not be taken into account by the C
# implementation of the dump method if a subclass defines a
# class-level dispatch_table attribute as was done in
# cloudpickle 1.6.0:
# https://github.com/joblib/loky/pull/260
dt_attribute.__set__(self, dispatch_table)
break
# On top of member descriptor set, also use setattr such that code
# that directly access self.dispatch_table gets a consistent view
# of the same table.
self.dispatch_table = dispatch_table
def __init__(self, writer, reducers=None, protocol=HIGHEST_PROTOCOL):
loky_pickler_cls.__init__(self, writer, protocol=protocol)
if reducers is None:
reducers = {}
if hasattr(self, "dispatch_table"):
# Force a copy that we will update without mutating the
# any class level defined dispatch_table.
loky_dt = dict(self.dispatch_table)
else:
# Use standard reducers as bases
loky_dt = copyreg.dispatch_table.copy()
# Register loky specific reducers
loky_dt.update(_dispatch_table)
# Set the new dispatch table, taking care of the fact that we
# need to use the member_descriptor when we inherit from a
# subclass of the C implementation of the Pickler base class
# with an class level dispatch_table attribute.
self._set_dispatch_table(loky_dt)
# Register the reducers
for type, reduce_func in reducers.items():
self.register(type, reduce_func)
def register(self, type, reduce_func):
"""Attach a reducer function to a given type in the dispatch table."""
self.dispatch_table[type] = reduce_func
_LokyPickler = CustomizablePickler
_loky_pickler_name = loky_pickler
def get_loky_pickler_name():
global _loky_pickler_name
return _loky_pickler_name
def get_loky_pickler():
global _LokyPickler
return _LokyPickler
# Set it to its default value
set_loky_pickler()
def dump(obj, file, reducers=None, protocol=None):
"""Replacement for pickle.dump() using _LokyPickler."""
global _LokyPickler
_LokyPickler(file, reducers=reducers, protocol=protocol).dump(obj)
def dumps(obj, reducers=None, protocol=None):
global _LokyPickler
buf = io.BytesIO()
dump(obj, buf, reducers=reducers, protocol=protocol)
return buf.getbuffer()
__all__ = ["dump", "dumps", "loads", "register", "set_loky_pickler"]
if sys.platform == "win32":
from multiprocessing.reduction import duplicate
__all__ += ["duplicate"]
|