|
from tqdm import tqdm |
|
from concurrent.futures import ProcessPoolExecutor, as_completed |
|
|
|
|
|
def parallel_process(array, function, n_jobs=16, use_kwargs=False, front_num=0): |
|
""" |
|
A parallel version of the map function with a progress bar. |
|
Args: |
|
array (array-like): An array to iterate over. |
|
function (function): A python function to apply to the elements of array |
|
n_jobs (int, default=16): The number of cores to use |
|
use_kwargs (boolean, default=False): Whether to consider the elements of array as dictionaries of |
|
keyword arguments to function |
|
front_num (int, default=3): The number of iterations to run serially before kicking off the parallel job. |
|
Useful for catching bugs |
|
Returns: |
|
[function(array[0]), function(array[1]), ...] |
|
""" |
|
|
|
if front_num > 0: |
|
front = [function(**a) if use_kwargs else function(a) |
|
for a in array[:front_num]] |
|
else: |
|
front = [] |
|
|
|
if n_jobs == 1: |
|
return front + [function(**a) if use_kwargs else function(a) for a in tqdm(array[front_num:])] |
|
|
|
with ProcessPoolExecutor(max_workers=n_jobs) as pool: |
|
|
|
if use_kwargs: |
|
futures = [pool.submit(function, **a) for a in array[front_num:]] |
|
else: |
|
futures = [pool.submit(function, a) for a in array[front_num:]] |
|
kwargs = { |
|
'total': len(futures), |
|
'unit': 'it', |
|
'unit_scale': True, |
|
'leave': True |
|
} |
|
|
|
for f in tqdm(as_completed(futures), **kwargs): |
|
pass |
|
out = [] |
|
|
|
for i, future in tqdm(enumerate(futures)): |
|
try: |
|
out.append(future.result()) |
|
except Exception as e: |
|
out.append(e) |
|
return front + out |
|
|