了解在进行缩减时如何分散数据



我正在使用Dask进行复杂的操作。首先,我做了一个约简,生成一个中等大小的df(几个MB(,然后我需要将其传递给每个工作者来计算最终结果,这样我的代码看起来有点像这个

intermediate_result = ddf.reduction().compute()
final_result = ddf.reduction(
chunk=function, chunk_kwargs={"intermediate_result": intermediate_result}
)

然而,我收到的警告信息看起来像这个

Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers
future = client.submit(func, big_data)    # bad
big_future = client.scatter(big_data)     # good
future = client.submit(func, big_future)  # good
% (format_bytes(len(b)), s)

我试过做这个

intermediate_result = client.scatter(intermediate_result, broadcast=True)

但这不起作用,因为函数现在将其视为Future对象,而不是它应该是的数据类型。我似乎找不到任何关于如何使用带减少的分散的文档,有人知道如何做到这一点吗?或者我应该忽略警告消息,并按照原样通过中等大小的df?

实际上,最好的解决方案可能不是分散实际结果,而是从一开始就避免计算它。您可以简单地删除.compute(),这意味着所有的计算都在一个阶段完成,结果会自动移动到您需要的地方。

或者,如果你想在阶段之间有一个清晰的边界,你可以使用

intermediate_result = ddf.reduction().persist()

这将启动减少并将其存储在工人身上,而无需将其拉到客户端。您可以选择等待此操作完成,然后再执行下一步。

相关内容

  • 没有找到相关文章

最新更新