多处理池:显示准备好的未处理结果的数量



使用 python 的multiprocessing.Poolimap_unordered,我正在将一长串缓慢的任务排队给池中的一组工作人员。遍历结果对象,无论结果的提交顺序如何,我都能在结果准备就绪时处理结果,在那里我执行一个额外的逻辑块,该逻辑块无法并行化并包含在imap_unorderedfunc中。

我希望能够列出有多少任务已经"完成"(又名func执行并返回)并准备就绪,但尚未迭代。

考虑以下伪代码:

def exp_func(i):
time.sleep(i)
print("func", i)
def fun_unparallelable(i):
print("fun_unparallelable", i)
time.sleep(10)
p = multiprocessing.Pool(processes=10)
result_iter = p.imap_unordered(exp_func, range(1000))
done = 0
for result in result_iter:
done += 1
print("done", done)
print("ready", get_pool_ready(...))
fun_unparallelable(i)

由于fun_unparallelable需要一个恒定的时间,因此在宏伟的计划中预计是微不足道的,但是在高峰时间或经过一长串相对快速的exp_func调用之后(就像在exp_func的初始执行中发生的那样),exp_func已经完成执行但尚未迭代的任务的预期积压。

为了尽可能清楚地说明这一点,以下是自执行以来给定时间的预期输出:

我希望在t == 1有以下输出:

func 1
done 1
ready 0

然后在t == 9时,附加输出:

func 2
func 3
func 4
func 5
func 6
func 7
func 8
func 9

最后在t == 11,将额外打印以下内容:

func 10
done 2
ready 8

您感兴趣的信息没有公共接口。

在您的情况下,imap_unordered将返回IMapUnorderedIterator但不保证此返回类型。 在某些情况下,会返回一个生成器(并且文档除了迭代器之外什么都不承诺)。

在返回IMapUnorderedIterator的情况下,私有属性_items保存所有可用但尚未迭代的元素。 因此,访问此私有属性,您可以检查len(result_iter._items)以查找所需的信息(显然它最多仅在检查时正确,因为池可能随时添加新结果,但大概这对您仍然有用)。

您可能需要考虑向多处理库贡献一个补丁,该补丁在公共接口下公开此信息。

相关内容

  • 没有找到相关文章

最新更新