|
"""Storage providers backends for Memory caching.""" |
|
|
|
import collections |
|
import datetime |
|
import json |
|
import operator |
|
import os |
|
import os.path |
|
import re |
|
import shutil |
|
import threading |
|
import time |
|
import warnings |
|
from abc import ABCMeta, abstractmethod |
|
from pickle import PicklingError |
|
|
|
from . import numpy_pickle |
|
from .backports import concurrency_safe_rename |
|
from .disk import memstr_to_bytes, mkdirp, rm_subdirs |
|
from .logger import format_time |
|
|
|
CacheItemInfo = collections.namedtuple("CacheItemInfo", "path size last_access") |
|
|
|
|
|
class CacheWarning(Warning): |
|
"""Warning to capture dump failures except for PicklingError.""" |
|
|
|
pass |
|
|
|
|
|
def concurrency_safe_write(object_to_write, filename, write_func): |
|
"""Writes an object into a unique file in a concurrency-safe way.""" |
|
thread_id = id(threading.current_thread()) |
|
temporary_filename = "{}.thread-{}-pid-{}".format(filename, thread_id, os.getpid()) |
|
write_func(object_to_write, temporary_filename) |
|
|
|
return temporary_filename |
|
|
|
|
|
class StoreBackendBase(metaclass=ABCMeta): |
|
"""Helper Abstract Base Class which defines all methods that |
|
a StorageBackend must implement.""" |
|
|
|
location = None |
|
|
|
@abstractmethod |
|
def _open_item(self, f, mode): |
|
"""Opens an item on the store and return a file-like object. |
|
|
|
This method is private and only used by the StoreBackendMixin object. |
|
|
|
Parameters |
|
---------- |
|
f: a file-like object |
|
The file-like object where an item is stored and retrieved |
|
mode: string, optional |
|
the mode in which the file-like object is opened allowed valued are |
|
'rb', 'wb' |
|
|
|
Returns |
|
------- |
|
a file-like object |
|
""" |
|
|
|
@abstractmethod |
|
def _item_exists(self, location): |
|
"""Checks if an item location exists in the store. |
|
|
|
This method is private and only used by the StoreBackendMixin object. |
|
|
|
Parameters |
|
---------- |
|
location: string |
|
The location of an item. On a filesystem, this corresponds to the |
|
absolute path, including the filename, of a file. |
|
|
|
Returns |
|
------- |
|
True if the item exists, False otherwise |
|
""" |
|
|
|
@abstractmethod |
|
def _move_item(self, src, dst): |
|
"""Moves an item from src to dst in the store. |
|
|
|
This method is private and only used by the StoreBackendMixin object. |
|
|
|
Parameters |
|
---------- |
|
src: string |
|
The source location of an item |
|
dst: string |
|
The destination location of an item |
|
""" |
|
|
|
@abstractmethod |
|
def create_location(self, location): |
|
"""Creates a location on the store. |
|
|
|
Parameters |
|
---------- |
|
location: string |
|
The location in the store. On a filesystem, this corresponds to a |
|
directory. |
|
""" |
|
|
|
@abstractmethod |
|
def clear_location(self, location): |
|
"""Clears a location on the store. |
|
|
|
Parameters |
|
---------- |
|
location: string |
|
The location in the store. On a filesystem, this corresponds to a |
|
directory or a filename absolute path |
|
""" |
|
|
|
@abstractmethod |
|
def get_items(self): |
|
"""Returns the whole list of items available in the store. |
|
|
|
Returns |
|
------- |
|
The list of items identified by their ids (e.g filename in a |
|
filesystem). |
|
""" |
|
|
|
@abstractmethod |
|
def configure(self, location, verbose=0, backend_options=dict()): |
|
"""Configures the store. |
|
|
|
Parameters |
|
---------- |
|
location: string |
|
The base location used by the store. On a filesystem, this |
|
corresponds to a directory. |
|
verbose: int |
|
The level of verbosity of the store |
|
backend_options: dict |
|
Contains a dictionary of named parameters used to configure the |
|
store backend. |
|
""" |
|
|
|
|
|
class StoreBackendMixin(object): |
|
"""Class providing all logic for managing the store in a generic way. |
|
|
|
The StoreBackend subclass has to implement 3 methods: create_location, |
|
clear_location and configure. The StoreBackend also has to provide |
|
a private _open_item, _item_exists and _move_item methods. The _open_item |
|
method has to have the same signature as the builtin open and return a |
|
file-like object. |
|
""" |
|
|
|
def load_item(self, call_id, verbose=1, timestamp=None, metadata=None): |
|
"""Load an item from the store given its id as a list of str.""" |
|
full_path = os.path.join(self.location, *call_id) |
|
|
|
if verbose > 1: |
|
ts_string = ( |
|
"{: <16}".format(format_time(time.time() - timestamp)) |
|
if timestamp is not None |
|
else "" |
|
) |
|
signature = os.path.basename(call_id[0]) |
|
if metadata is not None and "input_args" in metadata: |
|
kwargs = ", ".join( |
|
"{}={}".format(*item) for item in metadata["input_args"].items() |
|
) |
|
signature += "({})".format(kwargs) |
|
msg = "[Memory]{}: Loading {}".format(ts_string, signature) |
|
if verbose < 10: |
|
print("{0}...".format(msg)) |
|
else: |
|
print("{0} from {1}".format(msg, full_path)) |
|
|
|
mmap_mode = None if not hasattr(self, "mmap_mode") else self.mmap_mode |
|
|
|
filename = os.path.join(full_path, "output.pkl") |
|
if not self._item_exists(filename): |
|
raise KeyError( |
|
"Non-existing item (may have been " |
|
"cleared).\nFile %s does not exist" % filename |
|
) |
|
|
|
|
|
if mmap_mode is None: |
|
with self._open_item(filename, "rb") as f: |
|
item = numpy_pickle.load(f) |
|
else: |
|
item = numpy_pickle.load(filename, mmap_mode=mmap_mode) |
|
return item |
|
|
|
def dump_item(self, call_id, item, verbose=1): |
|
"""Dump an item in the store at the id given as a list of str.""" |
|
try: |
|
item_path = os.path.join(self.location, *call_id) |
|
if not self._item_exists(item_path): |
|
self.create_location(item_path) |
|
filename = os.path.join(item_path, "output.pkl") |
|
if verbose > 10: |
|
print("Persisting in %s" % item_path) |
|
|
|
def write_func(to_write, dest_filename): |
|
with self._open_item(dest_filename, "wb") as f: |
|
try: |
|
numpy_pickle.dump(to_write, f, compress=self.compress) |
|
except PicklingError as e: |
|
|
|
warnings.warn( |
|
"Unable to cache to disk: failed to pickle " |
|
"output. In version 1.5 this will raise an " |
|
f"exception. Exception: {e}.", |
|
FutureWarning, |
|
) |
|
|
|
self._concurrency_safe_write(item, filename, write_func) |
|
except Exception as e: |
|
warnings.warn( |
|
"Unable to cache to disk. Possibly a race condition in the " |
|
f"creation of the directory. Exception: {e}.", |
|
CacheWarning, |
|
) |
|
|
|
def clear_item(self, call_id): |
|
"""Clear the item at the id, given as a list of str.""" |
|
item_path = os.path.join(self.location, *call_id) |
|
if self._item_exists(item_path): |
|
self.clear_location(item_path) |
|
|
|
def contains_item(self, call_id): |
|
"""Check if there is an item at the id, given as a list of str.""" |
|
item_path = os.path.join(self.location, *call_id) |
|
filename = os.path.join(item_path, "output.pkl") |
|
|
|
return self._item_exists(filename) |
|
|
|
def get_item_info(self, call_id): |
|
"""Return information about item.""" |
|
return {"location": os.path.join(self.location, *call_id)} |
|
|
|
def get_metadata(self, call_id): |
|
"""Return actual metadata of an item.""" |
|
try: |
|
item_path = os.path.join(self.location, *call_id) |
|
filename = os.path.join(item_path, "metadata.json") |
|
with self._open_item(filename, "rb") as f: |
|
return json.loads(f.read().decode("utf-8")) |
|
except: |
|
return {} |
|
|
|
def store_metadata(self, call_id, metadata): |
|
"""Store metadata of a computation.""" |
|
try: |
|
item_path = os.path.join(self.location, *call_id) |
|
self.create_location(item_path) |
|
filename = os.path.join(item_path, "metadata.json") |
|
|
|
def write_func(to_write, dest_filename): |
|
with self._open_item(dest_filename, "wb") as f: |
|
f.write(json.dumps(to_write).encode("utf-8")) |
|
|
|
self._concurrency_safe_write(metadata, filename, write_func) |
|
except: |
|
pass |
|
|
|
def contains_path(self, call_id): |
|
"""Check cached function is available in store.""" |
|
func_path = os.path.join(self.location, *call_id) |
|
return self.object_exists(func_path) |
|
|
|
def clear_path(self, call_id): |
|
"""Clear all items with a common path in the store.""" |
|
func_path = os.path.join(self.location, *call_id) |
|
if self._item_exists(func_path): |
|
self.clear_location(func_path) |
|
|
|
def store_cached_func_code(self, call_id, func_code=None): |
|
"""Store the code of the cached function.""" |
|
func_path = os.path.join(self.location, *call_id) |
|
if not self._item_exists(func_path): |
|
self.create_location(func_path) |
|
|
|
if func_code is not None: |
|
filename = os.path.join(func_path, "func_code.py") |
|
with self._open_item(filename, "wb") as f: |
|
f.write(func_code.encode("utf-8")) |
|
|
|
def get_cached_func_code(self, call_id): |
|
"""Store the code of the cached function.""" |
|
filename = os.path.join(self.location, *call_id, "func_code.py") |
|
try: |
|
with self._open_item(filename, "rb") as f: |
|
return f.read().decode("utf-8") |
|
except: |
|
raise |
|
|
|
def get_cached_func_info(self, call_id): |
|
"""Return information related to the cached function if it exists.""" |
|
return {"location": os.path.join(self.location, *call_id)} |
|
|
|
def clear(self): |
|
"""Clear the whole store content.""" |
|
self.clear_location(self.location) |
|
|
|
def enforce_store_limits(self, bytes_limit, items_limit=None, age_limit=None): |
|
""" |
|
Remove the store's oldest files to enforce item, byte, and age limits. |
|
""" |
|
items_to_delete = self._get_items_to_delete(bytes_limit, items_limit, age_limit) |
|
|
|
for item in items_to_delete: |
|
if self.verbose > 10: |
|
print("Deleting item {0}".format(item)) |
|
try: |
|
self.clear_location(item.path) |
|
except OSError: |
|
|
|
|
|
|
|
|
|
pass |
|
|
|
def _get_items_to_delete(self, bytes_limit, items_limit=None, age_limit=None): |
|
""" |
|
Get items to delete to keep the store under size, file, & age limits. |
|
""" |
|
if isinstance(bytes_limit, str): |
|
bytes_limit = memstr_to_bytes(bytes_limit) |
|
|
|
items = self.get_items() |
|
if not items: |
|
return [] |
|
|
|
size = sum(item.size for item in items) |
|
|
|
if bytes_limit is not None: |
|
to_delete_size = size - bytes_limit |
|
else: |
|
to_delete_size = 0 |
|
|
|
if items_limit is not None: |
|
to_delete_items = len(items) - items_limit |
|
else: |
|
to_delete_items = 0 |
|
|
|
if age_limit is not None: |
|
older_item = min(item.last_access for item in items) |
|
if age_limit.total_seconds() < 0: |
|
raise ValueError("age_limit has to be a positive timedelta") |
|
deadline = datetime.datetime.now() - age_limit |
|
else: |
|
deadline = None |
|
|
|
if ( |
|
to_delete_size <= 0 |
|
and to_delete_items <= 0 |
|
and (deadline is None or older_item > deadline) |
|
): |
|
return [] |
|
|
|
|
|
|
|
items.sort(key=operator.attrgetter("last_access")) |
|
|
|
items_to_delete = [] |
|
size_so_far = 0 |
|
items_so_far = 0 |
|
|
|
for item in items: |
|
if ( |
|
(size_so_far >= to_delete_size) |
|
and items_so_far >= to_delete_items |
|
and (deadline is None or deadline < item.last_access) |
|
): |
|
break |
|
|
|
items_to_delete.append(item) |
|
size_so_far += item.size |
|
items_so_far += 1 |
|
|
|
return items_to_delete |
|
|
|
def _concurrency_safe_write(self, to_write, filename, write_func): |
|
"""Writes an object into a file in a concurrency-safe way.""" |
|
temporary_filename = concurrency_safe_write(to_write, filename, write_func) |
|
self._move_item(temporary_filename, filename) |
|
|
|
def __repr__(self): |
|
"""Printable representation of the store location.""" |
|
return '{class_name}(location="{location}")'.format( |
|
class_name=self.__class__.__name__, location=self.location |
|
) |
|
|
|
|
|
class FileSystemStoreBackend(StoreBackendBase, StoreBackendMixin): |
|
"""A StoreBackend used with local or network file systems.""" |
|
|
|
_open_item = staticmethod(open) |
|
_item_exists = staticmethod(os.path.exists) |
|
_move_item = staticmethod(concurrency_safe_rename) |
|
|
|
def clear_location(self, location): |
|
"""Delete location on store.""" |
|
if location == self.location: |
|
rm_subdirs(location) |
|
else: |
|
shutil.rmtree(location, ignore_errors=True) |
|
|
|
def create_location(self, location): |
|
"""Create object location on store""" |
|
mkdirp(location) |
|
|
|
def get_items(self): |
|
"""Returns the whole list of items available in the store.""" |
|
items = [] |
|
|
|
for dirpath, _, filenames in os.walk(self.location): |
|
is_cache_hash_dir = re.match("[a-f0-9]{32}", os.path.basename(dirpath)) |
|
|
|
if is_cache_hash_dir: |
|
output_filename = os.path.join(dirpath, "output.pkl") |
|
try: |
|
last_access = os.path.getatime(output_filename) |
|
except OSError: |
|
try: |
|
last_access = os.path.getatime(dirpath) |
|
except OSError: |
|
|
|
continue |
|
|
|
last_access = datetime.datetime.fromtimestamp(last_access) |
|
try: |
|
full_filenames = [os.path.join(dirpath, fn) for fn in filenames] |
|
dirsize = sum(os.path.getsize(fn) for fn in full_filenames) |
|
except OSError: |
|
|
|
|
|
|
|
continue |
|
|
|
items.append(CacheItemInfo(dirpath, dirsize, last_access)) |
|
|
|
return items |
|
|
|
def configure(self, location, verbose=1, backend_options=None): |
|
"""Configure the store backend. |
|
|
|
For this backend, valid store options are 'compress' and 'mmap_mode' |
|
""" |
|
if backend_options is None: |
|
backend_options = {} |
|
|
|
|
|
self.location = location |
|
if not os.path.exists(self.location): |
|
mkdirp(self.location) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cache_directory = ( |
|
os.path.dirname(location) |
|
if os.path.dirname(location) and os.path.basename(location) == "joblib" |
|
else location |
|
) |
|
with open(os.path.join(cache_directory, ".gitignore"), "w") as file: |
|
file.write("# Created by joblib automatically.\n") |
|
file.write("*\n") |
|
|
|
|
|
self.compress = backend_options.get("compress", False) |
|
|
|
|
|
|
|
mmap_mode = backend_options.get("mmap_mode") |
|
if self.compress and mmap_mode is not None: |
|
warnings.warn( |
|
"Compressed items cannot be memmapped in a " |
|
"filesystem store. Option will be ignored.", |
|
stacklevel=2, |
|
) |
|
|
|
self.mmap_mode = mmap_mode |
|
self.verbose = verbose |
|
|