使用 python 的multiprocessing.Pool
和imap_unordered
,我正在将一长串缓慢的任务排队给池中的一组工作人员。遍历结果对象,无论结果的提交顺序如何,我都能在结果准备就绪时处理结果,在那里我执行一个额外的逻辑块,该逻辑块无法并行化并包含在imap_unordered
的func
中。
我希望能够列出有多少任务已经"完成"(又名func
执行并返回)并准备就绪,但尚未迭代。
考虑以下伪代码:
def exp_func(i):
time.sleep(i)
print("func", i)
def fun_unparallelable(i):
print("fun_unparallelable", i)
time.sleep(10)
p = multiprocessing.Pool(processes=10)
result_iter = p.imap_unordered(exp_func, range(1000))
done = 0
for result in result_iter:
done += 1
print("done", done)
print("ready", get_pool_ready(...))
fun_unparallelable(i)
由于fun_unparallelable
需要一个恒定的时间,因此在宏伟的计划中预计是微不足道的,但是在高峰时间或经过一长串相对快速的exp_func
调用之后(就像在exp_func
的初始执行中发生的那样),exp_func
已经完成执行但尚未迭代的任务的预期积压。
为了尽可能清楚地说明这一点,以下是自执行以来给定时间的预期输出:
我希望在t == 1
有以下输出:
func 1
done 1
ready 0
然后在t == 9
时,附加输出:
func 2
func 3
func 4
func 5
func 6
func 7
func 8
func 9
最后在t == 11
,将额外打印以下内容:
func 10
done 2
ready 8
您感兴趣的信息没有公共接口。
在您的情况下,imap_unordered
将返回IMapUnorderedIterator
但不保证此返回类型。 在某些情况下,会返回一个生成器(并且文档除了迭代器之外什么都不承诺)。
在返回IMapUnorderedIterator
的情况下,私有属性_items
保存所有可用但尚未迭代的元素。 因此,访问此私有属性,您可以检查len(result_iter._items)
以查找所需的信息(显然它最多仅在检查时正确,因为池可能随时添加新结果,但大概这对您仍然有用)。
您可能需要考虑向多处理库贡献一个补丁,该补丁在公共接口下公开此信息。