Streamz/Dask:收集不会等待缓冲区的所有结果



Imports:

from dask.distributed import Client
import streamz
import time

模拟工作负载:

def increment(x):
time.sleep(0.5)
return x + 1

假设我想在本地 Dask 客户端上处理一些工作负载:

if __name__ == "__main__":
with Client() as dask_client:
ps = streamz.Stream()
ps.scatter().map(increment).gather().sink(print)
for i in range(10):
ps.emit(i)

这按预期工作,但sink(print)当然会强制等待每个结果,因此流不会并行执行。

但是,如果我使用buffer()来允许缓存结果,那么gather()似乎不再正确收集所有结果,解释器在获得结果之前退出。这种方法:

if __name__ == "__main__":
with Client() as dask_client:
ps = streamz.Stream()
ps.scatter().map(increment).buffer(10).gather().sink(print)
# ^
for i in range(10):          # - allow parallel execution 
ps.emit(i)               # - before gather()

不为我打印任何结果。Python 解释器只是在启动脚本后不久和buffer()发出它的结果之前退出,因此不会打印任何内容

但是,如果主进程被迫等待一段时间,则结果将以并行方式打印(因此它们不会相互等待,而是几乎同时打印(:

if __name__ == "__main__":
with Client() as dask_client:
ps = streamz.Stream()
ps.scatter().map(increment).buffer(10).gather().sink(print)
for i in range(10):
ps.emit(i)
time.sleep(10)  # <- force main process to wait while ps is working

为什么?我认为gather()应该等待一批 10 个结果,因为buffer()应该在将它们刷新到gather()之前并行缓存 10 个结果。为什么在这种情况下gather()不阻止?

有没有一种很好的方法来检查 Stream 是否仍然包含正在处理的元素,以防止主进程过早退出?

">
  1. 为什么会这样?":因为Dask分布式调度程序(执行流映射器和接收器函数(和python脚本在不同的进程中运行。当"with"块上下文结束时,您的 Dask 客户端将关闭,执行将关闭,然后发送到流的项目才能到达接收器函数。

  2. "有没有一种很好的方法来检查流是否仍然包含正在处理的元素":我不知道。但是:如果你想要的行为是(我只是在这里猜测(一堆项目的并行处理,那么 Streamz 不是你应该使用的,香草 Dask 应该就足够了。

最新更新