当 Dask 任务运行多次时,使用哪个结果?



首先,阅读这个问题: 使用分布式 Dask 调度程序重复执行任务

现在,当 Dask 由于工作线程窃取或任务失败(例如,由于每个进程的内存限制(而决定重新运行任务时,哪个任务结果会传递到 DAG 的下一个节点?我们正在使用嵌套任务,例如

@dask.delayed
def add(n):
return n+1
t_a = add(1)
t_b = add(t_a)
the_output = add(add(add(t_b)))

因此,如果其中一个任务失败或被盗,并且运行了两次,哪个结果会传递到 DAG 中的下一个节点?

感兴趣的更多背景资料: 出现这种情况的原因是我们的任务写入数据库。如果它运行两次,我们会收到完整性错误,因为它尝试插入同一记录两次(限制在id和组合version(。当前的计划是通过捕获任务中的完整性错误来使任务幂等,但我仍然不明白 Dask 如何"选择"结果。

如果你遇到像add(add(add(t_b)))这样的情况

或更一般

x = add(1)
y = add(x)
z = add(y)

尽管它们都使用相同的功能,但它们都是单独的任务。 Dask 看到他们有不同的输入,因此它对待他们的方式不同。

因此,如果其中一个任务失败或被盗,并且运行了两次,哪个结果会传递到 DAG 中的下一个节点?

在所有这些情况下,群集上一次只有一个有效结果。 被盗任务仅在新计算机上运行,而不在旧计算机上运行。 如果任务的结果丢失并且必须重新运行,则只有新值将出现在任何地方(请记住,旧值已丢失(。

最新更新