cluster.adapt()在将工作人员的内存数据移动到其他工作人员之前杀死他们



我正在使用带有Slurm集群的Dask:

cluster = SLURMCluster(cores=64, processes=64, memory="128G", walltime="24:00:00")
#export DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES=100
cluster.adapt(minimum_jobs=1, maximum_jobs=2, interval="20 s", target_duration="100 s", wait_count=20)

我的工作负载从大约1000个节点减少到1个,减少了两个节点。每次还原大约需要2分钟。因此,开始时可能有平行,但结束时不太平行。我只能访问群集中的两个节点。所以我希望它在开始时使用两个集群节点,在结束时使用一个集群节点。

# pseudo code
def reduce(task):
futures = []
for i in range(0, len(task), 2):
futures.append(client.submit(reduceTwo(), task[i]. task[i+1]))
while len(futures) != 1:
futures_new = []
for i in range(0, len(futures), 2):
futures_new.append(client.submit(reduceTwo(), futures[i].result(), futures[i+1].result()))
futures = futures_new
return futures[0].result()

然而,我的问题是,当cluster.adapt((预期从2个集群节点减少到1个集群节点时,它将首先减少到0并启动一个新节点。

问题1:降到0是否正常如果能够正确保存被杀死节点内存中的输出数据(可能保存在调度器节点,即集群的登录节点中(,这实际上不会是一个问题。然而,我读了日志,似乎在工人正常停止工作/退休之前,它被杀得太早了。有些工人退休了,有些没有。

问题2:这是"退役前杀死";可能发生吗?有没有办法让工人有更长的退休时间。你可以在上面的第一段代码中看到,我试图增加尽可能多的定时参数,但它不起作用。我不完全理解这个参数列表。

我知道我可以优化我的代码。像del futures一样,我们完成计算,这样工人的内存阶段任务将为0,并且它的死亡不会导致太多的计算需要重做。或者,可以有相同的还原库可以使用。但是,无论如何,这两个达斯克问题能解决吗?

回答我自己的问题,以防其他人看到同样的问题。

关键是:不要让所有节点都将其临时文件保存到同一共享磁盘中的同一目录中。

在不指定local_directory的情况下,所有节点都很容易将worker本地文件保存到~/dask-worker-space目录中,该目录在所有节点之间共享。然后,所有节点之间都有在该目录中读/写的竞争。然后,当一个节点想要杀死它的工人时,它可能会意外地杀死其他节点中的工人,最终(Q1(节点数量减少到0。而且(Q2(未能移动被杀工人的数据

我希望Dask可以支持所有节点写入同一个Dask工作空间。这真的是大自然的行为,我的意思是,当我只想快速使用Dask来做一些类似的事情时,我的直觉不会告诉我:"设置local_directory,否则程序将崩溃;。

相关内容

最新更新