|
''' |
|
WorkerPool and WorkerBase for handling the common problems in managing |
|
a multiprocess pool of workers that aren't done by multiprocessing.Pool, |
|
including setup with per-process state, debugging by putting the worker |
|
on the main thread, and correct handling of unexpected errors, and ctrl-C. |
|
|
|
To use it, |
|
1. Put the per-process setup and the per-task work in the |
|
setup() and work() methods of your own WorkerBase subclass. |
|
2. To prepare the process pool, instantiate a WorkerPool, passing your |
|
subclass type as the first (worker) argument, as well as any setup keyword |
|
arguments. The WorkerPool will instantiate one of your workers in each |
|
worker process (passing in the setup arguments in those processes). |
|
If debugging, the pool can have process_count=0 to force all the work |
|
to be done immediately on the main thread; otherwise all the work |
|
will be passed to other processes. |
|
3. Whenever there is a new piece of work to distribute, call pool.add(*args). |
|
The arguments will be queued and passed as worker.work(*args) to the |
|
next available worker. |
|
4. When all the work has been distributed, call pool.join() to wait for all |
|
the work to complete and to finish and terminate all the worker processes. |
|
When pool.join() returns, all the work will have been done. |
|
|
|
No arrangement is made to collect the results of the work: for example, |
|
the return value of work() is ignored. If you need to collect the |
|
results, use your own mechanism (filesystem, shared memory object, queue) |
|
which can be distributed using setup arguments. |
|
''' |
|
|
|
from multiprocessing import Process, Queue, cpu_count |
|
import signal |
|
import atexit |
|
import sys |
|
|
|
class WorkerBase(Process): |
|
''' |
|
Subclass this class and override its work() method (and optionally, |
|
setup() as well) to define the units of work to be done in a process |
|
worker in a woker pool. |
|
''' |
|
def __init__(self, i, process_count, queue, initargs): |
|
if process_count > 0: |
|
|
|
signal.signal(signal.SIGINT, signal.SIG_IGN) |
|
self.process_id = i |
|
self.process_count = process_count |
|
self.queue = queue |
|
super(WorkerBase, self).__init__() |
|
self.setup(**initargs) |
|
def run(self): |
|
|
|
while True: |
|
try: |
|
work_batch = self.queue.get() |
|
except (KeyboardInterrupt, SystemExit): |
|
print('Exiting...') |
|
break |
|
if work_batch is None: |
|
self.queue.put(None) |
|
return |
|
self.work(*work_batch) |
|
def setup(self, **initargs): |
|
''' |
|
Override this method for any per-process initialization. |
|
Keywoard args are passed from WorkerPool constructor. |
|
''' |
|
pass |
|
def work(self, *args): |
|
''' |
|
Override this method for one-time initialization. |
|
Args are passed from WorkerPool.add() arguments. |
|
''' |
|
raise NotImplementedError('worker subclass needed') |
|
|
|
class WorkerPool(object): |
|
''' |
|
Instantiate this object (passing a WorkerBase subclass type |
|
as its first argument) to create a worker pool. Then call |
|
pool.add(*args) to queue args to distribute to worker.work(*args), |
|
and call pool.join() to wait for all the workers to complete. |
|
''' |
|
def __init__(self, worker=WorkerBase, process_count=None, **initargs): |
|
global active_pools |
|
if process_count is None: |
|
process_count = cpu_count() |
|
if process_count == 0: |
|
|
|
self.queue = None |
|
self.processes = None |
|
self.worker = worker(None, 0, None, initargs) |
|
return |
|
|
|
|
|
original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) |
|
active_pools[id(self)] = self |
|
self.queue = Queue(maxsize=(process_count * 3)) |
|
self.processes = None |
|
self.processes = [worker(i, process_count, self.queue, initargs) |
|
for i in range(process_count)] |
|
for p in self.processes: |
|
p.start() |
|
|
|
signal.signal(signal.SIGINT, original_sigint_handler) |
|
def add(self, *work_batch): |
|
if self.queue is None: |
|
if hasattr(self, 'worker'): |
|
self.worker.work(*work_batch) |
|
else: |
|
print('WorkerPool shutting down.', file=sys.stderr) |
|
else: |
|
try: |
|
|
|
self.queue.put(work_batch) |
|
except (KeyboardInterrupt, SystemExit): |
|
|
|
self.early_terminate() |
|
def join(self): |
|
|
|
if self.queue is not None: |
|
self.queue.put(None) |
|
for p in self.processes: |
|
p.join() |
|
self.queue = None |
|
|
|
try: |
|
del active_pools[id(self)] |
|
except: |
|
pass |
|
def early_terminate(self): |
|
|
|
if self.queue is not None: |
|
try: |
|
self.queue.put_nowait(None) |
|
self.queue = None |
|
except: |
|
pass |
|
|
|
if self.processes is not None: |
|
for p in self.processes: |
|
p.terminate() |
|
self.processes = None |
|
try: |
|
del active_pools[id(self)] |
|
except: |
|
pass |
|
def __del__(self): |
|
if self.queue is not None: |
|
print('ERROR: workerpool.join() not called!', file=sys.stderr) |
|
self.join() |
|
|
|
|
|
active_pools = {} |
|
def early_terminate_pools(): |
|
for _, pool in list(active_pools.items()): |
|
pool.early_terminate() |
|
|
|
atexit.register(early_terminate_pools) |
|
|
|
|