|
"""Utilities for fast persistence of big data, with optional compression.""" |
|
|
|
|
|
|
|
|
|
|
|
import contextlib |
|
import io |
|
import pickle |
|
import sys |
|
import warnings |
|
|
|
from .compressor import _COMPRESSORS, _ZFILE_PREFIX |
|
|
|
try: |
|
import numpy as np |
|
except ImportError: |
|
np = None |
|
|
|
Unpickler = pickle._Unpickler |
|
Pickler = pickle._Pickler |
|
xrange = range |
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
|
import bz2 |
|
except ImportError: |
|
bz2 = None |
|
|
|
|
|
_IO_BUFFER_SIZE = 1024**2 |
|
|
|
|
|
def _is_raw_file(fileobj): |
|
"""Check if fileobj is a raw file object, e.g created with open.""" |
|
fileobj = getattr(fileobj, "raw", fileobj) |
|
return isinstance(fileobj, io.FileIO) |
|
|
|
|
|
def _get_prefixes_max_len(): |
|
|
|
prefixes = [len(compressor.prefix) for compressor in _COMPRESSORS.values()] |
|
prefixes += [len(_ZFILE_PREFIX)] |
|
return max(prefixes) |
|
|
|
|
|
def _is_numpy_array_byte_order_mismatch(array): |
|
"""Check if numpy array is having byte order mismatch""" |
|
return ( |
|
sys.byteorder == "big" |
|
and ( |
|
array.dtype.byteorder == "<" |
|
or ( |
|
array.dtype.byteorder == "|" |
|
and array.dtype.fields |
|
and all(e[0].byteorder == "<" for e in array.dtype.fields.values()) |
|
) |
|
) |
|
) or ( |
|
sys.byteorder == "little" |
|
and ( |
|
array.dtype.byteorder == ">" |
|
or ( |
|
array.dtype.byteorder == "|" |
|
and array.dtype.fields |
|
and all(e[0].byteorder == ">" for e in array.dtype.fields.values()) |
|
) |
|
) |
|
) |
|
|
|
|
|
def _ensure_native_byte_order(array): |
|
"""Use the byte order of the host while preserving values |
|
|
|
Does nothing if array already uses the system byte order. |
|
""" |
|
if _is_numpy_array_byte_order_mismatch(array): |
|
array = array.byteswap().view(array.dtype.newbyteorder("=")) |
|
return array |
|
|
|
|
|
|
|
|
|
def _detect_compressor(fileobj): |
|
"""Return the compressor matching fileobj. |
|
|
|
Parameters |
|
---------- |
|
fileobj: file object |
|
|
|
Returns |
|
------- |
|
str in {'zlib', 'gzip', 'bz2', 'lzma', 'xz', 'compat', 'not-compressed'} |
|
""" |
|
|
|
max_prefix_len = _get_prefixes_max_len() |
|
if hasattr(fileobj, "peek"): |
|
|
|
|
|
first_bytes = fileobj.peek(max_prefix_len) |
|
else: |
|
|
|
first_bytes = fileobj.read(max_prefix_len) |
|
fileobj.seek(0) |
|
|
|
if first_bytes.startswith(_ZFILE_PREFIX): |
|
return "compat" |
|
else: |
|
for name, compressor in _COMPRESSORS.items(): |
|
if first_bytes.startswith(compressor.prefix): |
|
return name |
|
|
|
return "not-compressed" |
|
|
|
|
|
def _buffered_read_file(fobj): |
|
"""Return a buffered version of a read file object.""" |
|
return io.BufferedReader(fobj, buffer_size=_IO_BUFFER_SIZE) |
|
|
|
|
|
def _buffered_write_file(fobj): |
|
"""Return a buffered version of a write file object.""" |
|
return io.BufferedWriter(fobj, buffer_size=_IO_BUFFER_SIZE) |
|
|
|
|
|
@contextlib.contextmanager |
|
def _validate_fileobject_and_memmap(fileobj, filename, mmap_mode=None): |
|
"""Utility function opening the right fileobject from a filename. |
|
|
|
The magic number is used to choose between the type of file object to open: |
|
* regular file object (default) |
|
* zlib file object |
|
* gzip file object |
|
* bz2 file object |
|
* lzma file object (for xz and lzma compressor) |
|
|
|
Parameters |
|
---------- |
|
fileobj: file object |
|
filename: str |
|
filename path corresponding to the fileobj parameter. |
|
mmap_mode: str |
|
memory map mode that should be used to open the pickle file. This |
|
parameter is useful to verify that the user is not trying to one with |
|
compression. Default: None. |
|
|
|
Returns |
|
------- |
|
a tuple with a file like object, and the validated mmap_mode. |
|
|
|
""" |
|
|
|
compressor = _detect_compressor(fileobj) |
|
validated_mmap_mode = mmap_mode |
|
|
|
if compressor == "compat": |
|
|
|
|
|
|
|
warnings.warn( |
|
"The file '%s' has been generated with a joblib " |
|
"version less than 0.10. " |
|
"Please regenerate this pickle file." % filename, |
|
DeprecationWarning, |
|
stacklevel=2, |
|
) |
|
yield filename, validated_mmap_mode |
|
else: |
|
if compressor in _COMPRESSORS: |
|
|
|
|
|
compressor_wrapper = _COMPRESSORS[compressor] |
|
inst = compressor_wrapper.decompressor_file(fileobj) |
|
fileobj = _buffered_read_file(inst) |
|
|
|
|
|
|
|
|
|
if mmap_mode is not None: |
|
validated_mmap_mode = None |
|
if isinstance(fileobj, io.BytesIO): |
|
warnings.warn( |
|
"In memory persistence is not compatible with " |
|
'mmap_mode "%(mmap_mode)s" flag passed. ' |
|
"mmap_mode option will be ignored." % locals(), |
|
stacklevel=2, |
|
) |
|
elif compressor != "not-compressed": |
|
warnings.warn( |
|
'mmap_mode "%(mmap_mode)s" is not compatible ' |
|
"with compressed file %(filename)s. " |
|
'"%(mmap_mode)s" flag will be ignored.' % locals(), |
|
stacklevel=2, |
|
) |
|
elif not _is_raw_file(fileobj): |
|
warnings.warn( |
|
'"%(fileobj)r" is not a raw file, mmap_mode ' |
|
'"%(mmap_mode)s" flag will be ignored.' % locals(), |
|
stacklevel=2, |
|
) |
|
else: |
|
validated_mmap_mode = mmap_mode |
|
|
|
yield fileobj, validated_mmap_mode |
|
|
|
|
|
def _write_fileobject(filename, compress=("zlib", 3)): |
|
"""Return the right compressor file object in write mode.""" |
|
compressmethod = compress[0] |
|
compresslevel = compress[1] |
|
|
|
if compressmethod in _COMPRESSORS.keys(): |
|
file_instance = _COMPRESSORS[compressmethod].compressor_file( |
|
filename, compresslevel=compresslevel |
|
) |
|
return _buffered_write_file(file_instance) |
|
else: |
|
file_instance = _COMPRESSORS["zlib"].compressor_file( |
|
filename, compresslevel=compresslevel |
|
) |
|
return _buffered_write_file(file_instance) |
|
|
|
|
|
|
|
|
|
|
|
BUFFER_SIZE = 2**18 |
|
|
|
|
|
def _read_bytes(fp, size, error_template="ran out of data"): |
|
"""Read from file-like object until size bytes are read. |
|
|
|
TODO python2_drop: is it still needed? The docstring mentions python 2.6 |
|
and it looks like this can be at least simplified ... |
|
|
|
Raises ValueError if not EOF is encountered before size bytes are read. |
|
Non-blocking objects only supported if they derive from io objects. |
|
|
|
Required as e.g. ZipExtFile in python 2.6 can return less data than |
|
requested. |
|
|
|
This function was taken from numpy/lib/format.py in version 1.10.2. |
|
|
|
Parameters |
|
---------- |
|
fp: file-like object |
|
size: int |
|
error_template: str |
|
|
|
Returns |
|
------- |
|
a bytes object |
|
The data read in bytes. |
|
|
|
""" |
|
data = bytes() |
|
while True: |
|
|
|
|
|
|
|
try: |
|
r = fp.read(size - len(data)) |
|
data += r |
|
if len(r) == 0 or len(data) == size: |
|
break |
|
except io.BlockingIOError: |
|
pass |
|
if len(data) != size: |
|
msg = "EOF: reading %s, expected %d bytes got %d" |
|
raise ValueError(msg % (error_template, size, len(data))) |
|
else: |
|
return data |
|
|
|
|
|
def _reconstruct(*args, **kwargs): |
|
|
|
|
|
|
|
|
|
|
|
np_major_version = np.__version__[:2] |
|
if np_major_version == "1.": |
|
from numpy.core.multiarray import _reconstruct as np_reconstruct |
|
elif np_major_version == "2.": |
|
from numpy._core.multiarray import _reconstruct as np_reconstruct |
|
|
|
return np_reconstruct(*args, **kwargs) |
|
|