我知道这听起来像是以前有人问过的事情,但是等等,我会解释为什么其他选项不起作用。
我目前正在使用multiprocessing.Pool
在应用程序中实现并行性,并希望对其进行扩展以能够利用嵌套并行性。仅将Pool
对象作为参数传递给apply_async
的幼稚方法并不像其他答案中所述那样有效,因为Pool
不能被腌制。
以下是我的要求:
-
我需要某种池来限制并发执行任务的数量。 例如
multiprocess.Pool
用于此目的,但不能传递给其他进程。 -
我需要嵌套并行性。在我的应用程序中,我需要执行 I/O 以确定嵌套工作是什么,所以我绝对不想从单个线程执行此操作。我认为这排除了这个问题的所有答案。
-
它需要在标准库中;我无法添加依赖项。这就排除了这个答案。
-
我真的很希望它同时适用于Python 2和3。但是,如果可以证明迁移到Python 3可以解决我的问题,我会考虑的。
我不需要它来专门使用多个进程,使用线程是可以的,因为大部分工作都是 I/O 或等待子进程完成。
我尝试使用multiprocessing.dummy
,这是相同的接口,但在threading
之上实现。但是,当我尝试调用get()
来检索我的测试结果时,我收到以下错误,所以我认为这已经出来了。
File "/usr/lib/python2.7/multiprocessing/pool.py", line 567, in get
raise self._value
ValueError: signal only works in main thread
我知道 Python 3 中的concurrent.futures
库,但这似乎有一些严重的限制。例如,在我的情况下,本节中的第二个示例似乎是一个表演障碍:
https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor
我不明白你如何避免使用基本上任何直接编写的嵌套并行算法来解决这个问题。因此,即使我愿意使用 Python 3,我认为这也不是一个入门。
我不知道标准库中有任何其他可用的选项,而无需编写自己的实现。
你似乎已经排除了,但我怀疑 https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor,或者 https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor,如果你能够迁移到Python 3,或者为Python 2添加依赖项。
如果在处理每个文件之前不必触发该文件的额外工作,则可以使用单个协调线程来触发所有其他线程,从而可以防止死锁,如以下示例所示。
from concurrent.futures import ThreadPoolExecutor
import time
pool = ThreadPoolExecutor(max_workers=3)
def find_work_inputs(dummy_file):
print("{}: Finding work...".format(dummy_file))
time.sleep(1)
work = range(0, dummy_file)
print("{}: Work is {}".format(dummy_file, work))
return work
def do_work(dummy_file, work_input):
print("{}: {}".format(dummy_file, work_input))
print("{}: Doing work {}...".format(dummy_file, work_input))
time.sleep(1)
return work_input * work_input
dummy_files = [1,2,3,4,5]
futures = []
for dummy_file in dummy_files:
work_inputs = pool.submit(find_work_inputs, dummy_file)
for work_input in work_inputs.result():
result = work_input
futures.append((dummy_file, result, pool.submit(do_work, dummy_file, result)))
for dummy_file, work_input, future in futures:
print("Result from file:{} input:{} is {}".format(dummy_file, work_input, future.result()))
或者,如果第一层的每个线程都需要自己触发工作,则可能需要将额外的工作放在另一个池中以防止死锁(取决于每个将来调用result()
的时间),如下所示。
from concurrent.futures import ThreadPoolExecutor
import time
find_work_pool = ThreadPoolExecutor(max_workers=3)
do_work_pool = ThreadPoolExecutor(max_workers=3)
def find_work_inputs(dummy_file):
print("{}: Finding work...".format(dummy_file))
time.sleep(1)
work = range(0, dummy_file)
print("{}: Work is {}".format(dummy_file, work))
futures = []
for work_input in work:
futures.append((dummy_file, work_input, do_work_pool.submit(do_work, dummy_file, work_input)))
return futures
def do_work(dummy_file, work_input):
print("{}: {}".format(dummy_file, work_input))
print("{}: Doing work {}...".format(dummy_file, work_input))
time.sleep(1)
return work_input * work_input
dummy_files = [1,2,3,4,5]
futures = []
for dummy_file in dummy_files:
futures.extend(find_work_pool.submit(find_work_inputs, dummy_file).result())
for dummy_file, work_input, future in futures:
print("Result from file:{} input:{} is {}".format(dummy_file, work_input, future.result()))