Spaces:
				
			
			
	
			
			
					
		Running
		
	
	
	
			
			
	
	
	
	
		
		
					
		Running
		
	| # Copyright 2009 Brian Quinlan. All Rights Reserved. | |
| # Licensed to PSF under a Contributor Agreement. | |
| """Implements ThreadPoolExecutor.""" | |
| __author__ = 'Brian Quinlan ([email protected])' | |
| from concurrent.futures import _base | |
| import itertools | |
| import queue | |
| import threading | |
| import types | |
| import weakref | |
| import os | |
| _threads_queues = weakref.WeakKeyDictionary() | |
| _shutdown = False | |
| # Lock that ensures that new workers are not created while the interpreter is | |
| # shutting down. Must be held while mutating _threads_queues and _shutdown. | |
| _global_shutdown_lock = threading.Lock() | |
| def _python_exit(): | |
| global _shutdown | |
| with _global_shutdown_lock: | |
| _shutdown = True | |
| items = list(_threads_queues.items()) | |
| for t, q in items: | |
| q.put(None) | |
| for t, q in items: | |
| t.join() | |
| # Register for `_python_exit()` to be called just before joining all | |
| # non-daemon threads. This is used instead of `atexit.register()` for | |
| # compatibility with subinterpreters, which no longer support daemon threads. | |
| # See bpo-39812 for context. | |
| threading._register_atexit(_python_exit) | |
| # At fork, reinitialize the `_global_shutdown_lock` lock in the child process | |
| if hasattr(os, 'register_at_fork'): | |
| os.register_at_fork(before=_global_shutdown_lock.acquire, | |
| after_in_child=_global_shutdown_lock._at_fork_reinit, | |
| after_in_parent=_global_shutdown_lock.release) | |
| class _WorkItem(object): | |
| def __init__(self, future, fn, args, kwargs): | |
| self.future = future | |
| self.fn = fn | |
| self.args = args | |
| self.kwargs = kwargs | |
| def run(self): | |
| if not self.future.set_running_or_notify_cancel(): | |
| return | |
| try: | |
| result = self.fn(*self.args, **self.kwargs) | |
| except BaseException as exc: | |
| self.future.set_exception(exc) | |
| # Break a reference cycle with the exception 'exc' | |
| self = None | |
| else: | |
| self.future.set_result(result) | |
| __class_getitem__ = classmethod(types.GenericAlias) | |
| def _worker(executor_reference, work_queue, initializer, initargs): | |
| if initializer is not None: | |
| try: | |
| initializer(*initargs) | |
| except BaseException: | |
| _base.LOGGER.critical('Exception in initializer:', exc_info=True) | |
| executor = executor_reference() | |
| if executor is not None: | |
| executor._initializer_failed() | |
| return | |
| try: | |
| while True: | |
| work_item = work_queue.get(block=True) | |
| if work_item is not None: | |
| work_item.run() | |
| # Delete references to object. See issue16284 | |
| del work_item | |
| # attempt to increment idle count | |
| executor = executor_reference() | |
| if executor is not None: | |
| executor._idle_semaphore.release() | |
| del executor | |
| continue | |
| executor = executor_reference() | |
| # Exit if: | |
| # - The interpreter is shutting down OR | |
| # - The executor that owns the worker has been collected OR | |
| # - The executor that owns the worker has been shutdown. | |
| if _shutdown or executor is None or executor._shutdown: | |
| # Flag the executor as shutting down as early as possible if it | |
| # is not gc-ed yet. | |
| if executor is not None: | |
| executor._shutdown = True | |
| # Notice other workers | |
| work_queue.put(None) | |
| return | |
| del executor | |
| except BaseException: | |
| _base.LOGGER.critical('Exception in worker', exc_info=True) | |
| class BrokenThreadPool(_base.BrokenExecutor): | |
| """ | |
| Raised when a worker thread in a ThreadPoolExecutor failed initializing. | |
| """ | |
| class ThreadPoolExecutor(_base.Executor): | |
| # Used to assign unique thread names when thread_name_prefix is not supplied. | |
| _counter = itertools.count().__next__ | |
| def __init__(self, max_workers=None, thread_name_prefix='', | |
| initializer=None, initargs=()): | |
| """Initializes a new ThreadPoolExecutor instance. | |
| Args: | |
| max_workers: The maximum number of threads that can be used to | |
| execute the given calls. | |
| thread_name_prefix: An optional name prefix to give our threads. | |
| initializer: A callable used to initialize worker threads. | |
| initargs: A tuple of arguments to pass to the initializer. | |
| """ | |
| if max_workers is None: | |
| # ThreadPoolExecutor is often used to: | |
| # * CPU bound task which releases GIL | |
| # * I/O bound task (which releases GIL, of course) | |
| # | |
| # We use cpu_count + 4 for both types of tasks. | |
| # But we limit it to 32 to avoid consuming surprisingly large resource | |
| # on many core machine. | |
| max_workers = min(32, (os.cpu_count() or 1) + 4) | |
| if max_workers <= 0: | |
| raise ValueError("max_workers must be greater than 0") | |
| if initializer is not None and not callable(initializer): | |
| raise TypeError("initializer must be a callable") | |
| self._max_workers = max_workers | |
| self._work_queue = queue.SimpleQueue() | |
| self._idle_semaphore = threading.Semaphore(0) | |
| self._threads = set() | |
| self._broken = False | |
| self._shutdown = False | |
| self._shutdown_lock = threading.Lock() | |
| self._thread_name_prefix = (thread_name_prefix or | |
| ("ThreadPoolExecutor-%d" % self._counter())) | |
| self._initializer = initializer | |
| self._initargs = initargs | |
| def submit(self, fn, /, *args, **kwargs): | |
| with self._shutdown_lock, _global_shutdown_lock: | |
| if self._broken: | |
| raise BrokenThreadPool(self._broken) | |
| if self._shutdown: | |
| raise RuntimeError('cannot schedule new futures after shutdown') | |
| if _shutdown: | |
| raise RuntimeError('cannot schedule new futures after ' | |
| 'interpreter shutdown') | |
| f = _base.Future() | |
| w = _WorkItem(f, fn, args, kwargs) | |
| self._work_queue.put(w) | |
| self._adjust_thread_count() | |
| return f | |
| submit.__doc__ = _base.Executor.submit.__doc__ | |
| def _adjust_thread_count(self): | |
| # if idle threads are available, don't spin new threads | |
| if self._idle_semaphore.acquire(timeout=0): | |
| return | |
| # When the executor gets lost, the weakref callback will wake up | |
| # the worker threads. | |
| def weakref_cb(_, q=self._work_queue): | |
| q.put(None) | |
| num_threads = len(self._threads) | |
| if num_threads < self._max_workers: | |
| thread_name = '%s_%d' % (self._thread_name_prefix or self, | |
| num_threads) | |
| t = threading.Thread(name=thread_name, target=_worker, | |
| args=(weakref.ref(self, weakref_cb), | |
| self._work_queue, | |
| self._initializer, | |
| self._initargs)) | |
| t.start() | |
| self._threads.add(t) | |
| _threads_queues[t] = self._work_queue | |
| def _initializer_failed(self): | |
| with self._shutdown_lock: | |
| self._broken = ('A thread initializer failed, the thread pool ' | |
| 'is not usable anymore') | |
| # Drain work queue and mark pending futures failed | |
| while True: | |
| try: | |
| work_item = self._work_queue.get_nowait() | |
| except queue.Empty: | |
| break | |
| if work_item is not None: | |
| work_item.future.set_exception(BrokenThreadPool(self._broken)) | |
| def shutdown(self, wait=True, *, cancel_futures=False): | |
| with self._shutdown_lock: | |
| self._shutdown = True | |
| if cancel_futures: | |
| # Drain all work items from the queue, and then cancel their | |
| # associated futures. | |
| while True: | |
| try: | |
| work_item = self._work_queue.get_nowait() | |
| except queue.Empty: | |
| break | |
| if work_item is not None: | |
| work_item.future.cancel() | |
| # Send a wake-up to prevent threads calling | |
| # _work_queue.get(block=True) from permanently blocking. | |
| self._work_queue.put(None) | |
| if wait: | |
| for t in self._threads: | |
| t.join() | |
| shutdown.__doc__ = _base.Executor.shutdown.__doc__ | |