如何在执行 Dask 任务之间保留一些 Python 对象状态?



我有一个Dask工作线程集群,我想用它来使用复杂的模型并行化预测操作。模型文件很大,加载需要时间,因此我使用 client.run 让所有工作线程运行初始化函数来加载此模型。

如何从client.run函数中保留 Python 变量状态,以便我可以在将来的任务操作中引用它并使用它?

我找到了dask.distributed.get_workerworker.data字典,并用它来设置任意值,然后我可以在map_partition函数中访问,但不确定这是最好还是最安全的选择。

如果一个工作线程死亡并重新启动,或者如果其他工作线程加入集群,有没有办法让这些工作线程自动调用我最初传递给client.run的相同函数?

只使用期货

如果您的模型/状态不变,那么我可能会使用client.scatter将其发送出去,并让 Dask 根据需要复制它。 这是最简单的方法,也是最强大的方法。 如果有新工人到达,那么它将根据需要复制它。

是的,使用get_worker是有意义的

但是,如果您想自己管理状态,那么是的,运行一个函数,获取一些状态并将其附加到worker是一个好主意:

get_worker().my_special_state = x

我不建议将数据放入get_worker().data,因为这是 Dask 管理自己的内存的地方。 看到里面有它不知道的外来东西可能会感到困惑。 事情应该没事,但你永远不知道。

工作线程插件

如果一个工作线程死亡并重新启动,或者如果其他工作线程加入集群,有没有办法让这些工作线程自动调用我最初传递给 client.run 的相同函数?

是的,这里最简单的方法是使用预加载脚本或工作线程插件。 请参阅 https://docs.dask.org/en/latest/setup/custom-startup.html

最新更新