我喜欢使用dd.persist((运行异步dask数据帧计算,然后能够跟踪单个分区的状态。目标是以非阻塞的方式访问部分结果。
这里需要的伪代码:
dd = dd.persist()
if dd.partitions[0].__dask_status__ == 'finished':
# Partial non-blocking result access
df = dd.partitions[0].compute()
使用dask futures效果很好,但与单个dd.persist((相比,提交许多单独的分区非常慢,并且每个分区有一个futures会破坏控制面板;组";选项卡,因为显示了太多块。
futures = list(map(client.compute, dd.partitions))
破碎的dask仪表板"组";选项卡
您可能想要的函数是distributed.futures_of
,它列出了集合正在运行的未来。您可以自己检查这个列表,查看未来的状态,或者与distributed.as_completed
和for循环一起使用,在分区可用时对其进行处理。期货的密钥类似于(collection-name, partition-index)
,所以您知道每个属于哪个分区。
dd.partitions[i]
(或用list
循环这些(不起作用的原因是,这会为每个分区创建一个新的图,因此您最终会向调度程序提交比对.persist()
的单个调用多得多的内容。