# Copyright (C) 2024-present Naver Corporation. All rights reserved. # Licensed under CC BY-NC-SA 4.0 (non-commercial use only). # # -------------------------------------------------------- # modified from DUSt3R from tqdm import tqdm from multiprocessing.dummy import Pool as ThreadPool from multiprocessing import cpu_count def parallel_threads( function, args, workers=0, star_args=False, kw_args=False, front_num=1, Pool=ThreadPool, **tqdm_kw ): """tqdm but with parallel execution. Will essentially return res = [ function(arg) # default function(*arg) # if star_args is True function(**arg) # if kw_args is True for arg in args] Note: the first elements of args will not be parallelized. This can be useful for debugging. """ while workers <= 0: workers += cpu_count() if workers == 1: front_num = float("inf") try: n_args_parallel = len(args) - front_num except TypeError: n_args_parallel = None args = iter(args) front = [] while len(front) < front_num: try: a = next(args) except StopIteration: return front # end of the iterable front.append( function(*a) if star_args else function(**a) if kw_args else function(a) ) out = [] with Pool(workers) as pool: if star_args: futures = pool.imap(starcall, [(function, a) for a in args]) elif kw_args: futures = pool.imap(starstarcall, [(function, a) for a in args]) else: futures = pool.imap(function, args) for f in tqdm(futures, total=n_args_parallel, **tqdm_kw): out.append(f) return front + out def parallel_processes(*args, **kwargs): """Same as parallel_threads, with processes""" import multiprocessing as mp kwargs["Pool"] = mp.Pool return parallel_threads(*args, **kwargs) def starcall(args): """convenient wrapper for Process.Pool""" function, args = args return function(*args) def starstarcall(args): """convenient wrapper for Process.Pool""" function, args = args return function(**args)