spam-classifier
/
venv
/lib
/python3.11
/site-packages
/joblib
/externals
/loky
/backend
/context.py
############################################################################### | |
# Basic context management with LokyContext | |
# | |
# author: Thomas Moreau and Olivier Grisel | |
# | |
# adapted from multiprocessing/context.py | |
# * Create a context ensuring loky uses only objects that are compatible | |
# * Add LokyContext to the list of context of multiprocessing so loky can be | |
# used with multiprocessing.set_start_method | |
# * Implement a CFS-aware amd physical-core aware cpu_count function. | |
# | |
import os | |
import sys | |
import math | |
import subprocess | |
import traceback | |
import warnings | |
import multiprocessing as mp | |
from multiprocessing import get_context as mp_get_context | |
from multiprocessing.context import BaseContext | |
from concurrent.futures.process import _MAX_WINDOWS_WORKERS | |
from .process import LokyProcess, LokyInitMainProcess | |
# Apparently, on older Python versions, loky cannot work 61 workers on Windows | |
# but instead 60: ¯\_(ツ)_/¯ | |
if sys.version_info < (3, 10): | |
_MAX_WINDOWS_WORKERS = _MAX_WINDOWS_WORKERS - 1 | |
START_METHODS = ["loky", "loky_init_main", "spawn"] | |
if sys.platform != "win32": | |
START_METHODS += ["fork", "forkserver"] | |
_DEFAULT_START_METHOD = None | |
# Cache for the number of physical cores to avoid repeating subprocess calls. | |
# It should not change during the lifetime of the program. | |
physical_cores_cache = None | |
def get_context(method=None): | |
# Try to overload the default context | |
method = method or _DEFAULT_START_METHOD or "loky" | |
if method == "fork": | |
# If 'fork' is explicitly requested, warn user about potential issues. | |
warnings.warn( | |
"`fork` start method should not be used with " | |
"`loky` as it does not respect POSIX. Try using " | |
"`spawn` or `loky` instead.", | |
UserWarning, | |
) | |
try: | |
return mp_get_context(method) | |
except ValueError: | |
raise ValueError( | |
f"Unknown context '{method}'. Value should be in " | |
f"{START_METHODS}." | |
) | |
def set_start_method(method, force=False): | |
global _DEFAULT_START_METHOD | |
if _DEFAULT_START_METHOD is not None and not force: | |
raise RuntimeError("context has already been set") | |
assert method is None or method in START_METHODS, ( | |
f"'{method}' is not a valid start_method. It should be in " | |
f"{START_METHODS}" | |
) | |
_DEFAULT_START_METHOD = method | |
def get_start_method(): | |
return _DEFAULT_START_METHOD | |
def cpu_count(only_physical_cores=False): | |
"""Return the number of CPUs the current process can use. | |
The returned number of CPUs accounts for: | |
* the number of CPUs in the system, as given by | |
``multiprocessing.cpu_count``; | |
* the CPU affinity settings of the current process | |
(available on some Unix systems); | |
* Cgroup CPU bandwidth limit (available on Linux only, typically | |
set by docker and similar container orchestration systems); | |
* the value of the LOKY_MAX_CPU_COUNT environment variable if defined. | |
and is given as the minimum of these constraints. | |
If ``only_physical_cores`` is True, return the number of physical cores | |
instead of the number of logical cores (hyperthreading / SMT). Note that | |
this option is not enforced if the number of usable cores is controlled in | |
any other way such as: process affinity, Cgroup restricted CPU bandwidth | |
or the LOKY_MAX_CPU_COUNT environment variable. If the number of physical | |
cores is not found, return the number of logical cores. | |
Note that on Windows, the returned number of CPUs cannot exceed 61 (or 60 for | |
Python < 3.10), see: | |
https://bugs.python.org/issue26903. | |
It is also always larger or equal to 1. | |
""" | |
# Note: os.cpu_count() is allowed to return None in its docstring | |
os_cpu_count = os.cpu_count() or 1 | |
if sys.platform == "win32": | |
# On Windows, attempting to use more than 61 CPUs would result in a | |
# OS-level error. See https://bugs.python.org/issue26903. According to | |
# https://learn.microsoft.com/en-us/windows/win32/procthread/processor-groups | |
# it might be possible to go beyond with a lot of extra work but this | |
# does not look easy. | |
os_cpu_count = min(os_cpu_count, _MAX_WINDOWS_WORKERS) | |
cpu_count_user = _cpu_count_user(os_cpu_count) | |
aggregate_cpu_count = max(min(os_cpu_count, cpu_count_user), 1) | |
if not only_physical_cores: | |
return aggregate_cpu_count | |
if cpu_count_user < os_cpu_count: | |
# Respect user setting | |
return max(cpu_count_user, 1) | |
cpu_count_physical, exception = _count_physical_cores() | |
if cpu_count_physical != "not found": | |
return cpu_count_physical | |
# Fallback to default behavior | |
if exception is not None: | |
# warns only the first time | |
warnings.warn( | |
"Could not find the number of physical cores for the " | |
f"following reason:\n{exception}\n" | |
"Returning the number of logical cores instead. You can " | |
"silence this warning by setting LOKY_MAX_CPU_COUNT to " | |
"the number of cores you want to use." | |
) | |
traceback.print_tb(exception.__traceback__) | |
return aggregate_cpu_count | |
def _cpu_count_cgroup(os_cpu_count): | |
# Cgroup CPU bandwidth limit available in Linux since 2.6 kernel | |
cpu_max_fname = "/sys/fs/cgroup/cpu.max" | |
cfs_quota_fname = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us" | |
cfs_period_fname = "/sys/fs/cgroup/cpu/cpu.cfs_period_us" | |
if os.path.exists(cpu_max_fname): | |
# cgroup v2 | |
# https://www.kernel.org/doc/html/latest/admin-guide/cgroup-v2.html | |
with open(cpu_max_fname) as fh: | |
cpu_quota_us, cpu_period_us = fh.read().strip().split() | |
elif os.path.exists(cfs_quota_fname) and os.path.exists(cfs_period_fname): | |
# cgroup v1 | |
# https://www.kernel.org/doc/html/latest/scheduler/sched-bwc.html#management | |
with open(cfs_quota_fname) as fh: | |
cpu_quota_us = fh.read().strip() | |
with open(cfs_period_fname) as fh: | |
cpu_period_us = fh.read().strip() | |
else: | |
# No Cgroup CPU bandwidth limit (e.g. non-Linux platform) | |
cpu_quota_us = "max" | |
cpu_period_us = 100_000 # unused, for consistency with default values | |
if cpu_quota_us == "max": | |
# No active Cgroup quota on a Cgroup-capable platform | |
return os_cpu_count | |
else: | |
cpu_quota_us = int(cpu_quota_us) | |
cpu_period_us = int(cpu_period_us) | |
if cpu_quota_us > 0 and cpu_period_us > 0: | |
return math.ceil(cpu_quota_us / cpu_period_us) | |
else: # pragma: no cover | |
# Setting a negative cpu_quota_us value is a valid way to disable | |
# cgroup CPU bandwith limits | |
return os_cpu_count | |
def _cpu_count_affinity(os_cpu_count): | |
# Number of available CPUs given affinity settings | |
if hasattr(os, "sched_getaffinity"): | |
try: | |
return len(os.sched_getaffinity(0)) | |
except NotImplementedError: | |
pass | |
# On some platforms, os.sched_getaffinity does not exist or raises | |
# NotImplementedError, let's try with the psutil if installed. | |
try: | |
import psutil | |
p = psutil.Process() | |
if hasattr(p, "cpu_affinity"): | |
return len(p.cpu_affinity()) | |
except ImportError: # pragma: no cover | |
if ( | |
sys.platform == "linux" | |
and os.environ.get("LOKY_MAX_CPU_COUNT") is None | |
): | |
# Some platforms don't implement os.sched_getaffinity on Linux which | |
# can cause severe oversubscription problems. Better warn the | |
# user in this particularly pathological case which can wreck | |
# havoc, typically on CI workers. | |
warnings.warn( | |
"Failed to inspect CPU affinity constraints on this system. " | |
"Please install psutil or explictly set LOKY_MAX_CPU_COUNT." | |
) | |
# This can happen for platforms that do not implement any kind of CPU | |
# infinity such as macOS-based platforms. | |
return os_cpu_count | |
def _cpu_count_user(os_cpu_count): | |
"""Number of user defined available CPUs""" | |
cpu_count_affinity = _cpu_count_affinity(os_cpu_count) | |
cpu_count_cgroup = _cpu_count_cgroup(os_cpu_count) | |
# User defined soft-limit passed as a loky specific environment variable. | |
cpu_count_loky = int(os.environ.get("LOKY_MAX_CPU_COUNT", os_cpu_count)) | |
return min(cpu_count_affinity, cpu_count_cgroup, cpu_count_loky) | |
def _count_physical_cores(): | |
"""Return a tuple (number of physical cores, exception) | |
If the number of physical cores is found, exception is set to None. | |
If it has not been found, return ("not found", exception). | |
The number of physical cores is cached to avoid repeating subprocess calls. | |
""" | |
exception = None | |
# First check if the value is cached | |
global physical_cores_cache | |
if physical_cores_cache is not None: | |
return physical_cores_cache, exception | |
# Not cached yet, find it | |
try: | |
if sys.platform == "linux": | |
cpu_count_physical = _count_physical_cores_linux() | |
elif sys.platform == "win32": | |
cpu_count_physical = _count_physical_cores_win32() | |
elif sys.platform == "darwin": | |
cpu_count_physical = _count_physical_cores_darwin() | |
else: | |
raise NotImplementedError(f"unsupported platform: {sys.platform}") | |
# if cpu_count_physical < 1, we did not find a valid value | |
if cpu_count_physical < 1: | |
raise ValueError(f"found {cpu_count_physical} physical cores < 1") | |
except Exception as e: | |
exception = e | |
cpu_count_physical = "not found" | |
# Put the result in cache | |
physical_cores_cache = cpu_count_physical | |
return cpu_count_physical, exception | |
def _count_physical_cores_linux(): | |
try: | |
cpu_info = subprocess.run( | |
"lscpu --parse=core".split(), capture_output=True, text=True | |
) | |
cpu_info = cpu_info.stdout.splitlines() | |
cpu_info = {line for line in cpu_info if not line.startswith("#")} | |
return len(cpu_info) | |
except: | |
pass # fallback to /proc/cpuinfo | |
cpu_info = subprocess.run( | |
"cat /proc/cpuinfo".split(), capture_output=True, text=True | |
) | |
cpu_info = cpu_info.stdout.splitlines() | |
cpu_info = {line for line in cpu_info if line.startswith("core id")} | |
return len(cpu_info) | |
def _count_physical_cores_win32(): | |
try: | |
cmd = "-Command (Get-CimInstance -ClassName Win32_Processor).NumberOfCores" | |
cpu_info = subprocess.run( | |
f"powershell.exe {cmd}".split(), | |
capture_output=True, | |
text=True, | |
) | |
cpu_info = cpu_info.stdout.splitlines() | |
return int(cpu_info[0]) | |
except: | |
pass # fallback to wmic (older Windows versions; deprecated now) | |
cpu_info = subprocess.run( | |
"wmic CPU Get NumberOfCores /Format:csv".split(), | |
capture_output=True, | |
text=True, | |
) | |
cpu_info = cpu_info.stdout.splitlines() | |
cpu_info = [ | |
l.split(",")[1] for l in cpu_info if (l and l != "Node,NumberOfCores") | |
] | |
return sum(map(int, cpu_info)) | |
def _count_physical_cores_darwin(): | |
cpu_info = subprocess.run( | |
"sysctl -n hw.physicalcpu".split(), | |
capture_output=True, | |
text=True, | |
) | |
cpu_info = cpu_info.stdout | |
return int(cpu_info) | |
class LokyContext(BaseContext): | |
"""Context relying on the LokyProcess.""" | |
_name = "loky" | |
Process = LokyProcess | |
cpu_count = staticmethod(cpu_count) | |
def Queue(self, maxsize=0, reducers=None): | |
"""Returns a queue object""" | |
from .queues import Queue | |
return Queue(maxsize, reducers=reducers, ctx=self.get_context()) | |
def SimpleQueue(self, reducers=None): | |
"""Returns a queue object""" | |
from .queues import SimpleQueue | |
return SimpleQueue(reducers=reducers, ctx=self.get_context()) | |
if sys.platform != "win32": | |
"""For Unix platform, use our custom implementation of synchronize | |
ensuring that we use the loky.backend.resource_tracker to clean-up | |
the semaphores in case of a worker crash. | |
""" | |
def Semaphore(self, value=1): | |
"""Returns a semaphore object""" | |
from .synchronize import Semaphore | |
return Semaphore(value=value) | |
def BoundedSemaphore(self, value): | |
"""Returns a bounded semaphore object""" | |
from .synchronize import BoundedSemaphore | |
return BoundedSemaphore(value) | |
def Lock(self): | |
"""Returns a lock object""" | |
from .synchronize import Lock | |
return Lock() | |
def RLock(self): | |
"""Returns a recurrent lock object""" | |
from .synchronize import RLock | |
return RLock() | |
def Condition(self, lock=None): | |
"""Returns a condition object""" | |
from .synchronize import Condition | |
return Condition(lock) | |
def Event(self): | |
"""Returns an event object""" | |
from .synchronize import Event | |
return Event() | |
class LokyInitMainContext(LokyContext): | |
"""Extra context with LokyProcess, which does load the main module | |
This context is used for compatibility in the case ``cloudpickle`` is not | |
present on the running system. This permits to load functions defined in | |
the ``main`` module, using proper safeguards. The declaration of the | |
``executor`` should be protected by ``if __name__ == "__main__":`` and the | |
functions and variable used from main should be out of this block. | |
This mimics the default behavior of multiprocessing under Windows and the | |
behavior of the ``spawn`` start method on a posix system. | |
For more details, see the end of the following section of python doc | |
https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming | |
""" | |
_name = "loky_init_main" | |
Process = LokyInitMainProcess | |
# Register loky context so it works with multiprocessing.get_context | |
ctx_loky = LokyContext() | |
mp.context._concrete_contexts["loky"] = ctx_loky | |
mp.context._concrete_contexts["loky_init_main"] = LokyInitMainContext() | |