我正在使用Dask进行复杂的操作。首先,我做了一个约简,生成一个中等大小的df(几个MB(,然后我需要将其传递给每个工作者来计算最终结果,这样我的代码看起来有点像这个
intermediate_result = ddf.reduction().compute()
final_result = ddf.reduction(
chunk=function, chunk_kwargs={"intermediate_result": intermediate_result}
)
然而,我收到的警告信息看起来像这个
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers
future = client.submit(func, big_data) # bad
big_future = client.scatter(big_data) # good
future = client.submit(func, big_future) # good
% (format_bytes(len(b)), s)
我试过做这个
intermediate_result = client.scatter(intermediate_result, broadcast=True)
但这不起作用,因为函数现在将其视为Future对象,而不是它应该是的数据类型。我似乎找不到任何关于如何使用带减少的分散的文档,有人知道如何做到这一点吗?或者我应该忽略警告消息,并按照原样通过中等大小的df?
实际上,最好的解决方案可能不是分散实际结果,而是从一开始就避免计算它。您可以简单地删除.compute()
,这意味着所有的计算都在一个阶段完成,结果会自动移动到您需要的地方。
或者,如果你想在阶段之间有一个清晰的边界,你可以使用
intermediate_result = ddf.reduction().persist()
这将启动减少并将其存储在工人身上,而无需将其拉到客户端。您可以选择等待此操作完成,然后再执行下一步。