我使用Dask
Bag在特殊集群上运行一些简单的map reduce计算:
import dask.bag as bag
summed_image = bag.from_sequence(my_ids).map(gen_image_from_ids).reduction(sum, sum).compute()
该代码生成一个链式计算,从from_sequence
和gen_image_from_ids
开始映射,然后将所有结果与sum
的结果归一。由于Dask Bag的特性,求和是在多级树中并行完成的。
我的特殊集群设置具有更高的故障率,因为我的工作人员随时可能被杀死,CUP由其他更高级别的进程接管,然后在一段时间后释放。杀死可能每5分钟只在单个节点上发生一次,但我的总减少工作可能需要5分钟以上。
尽管Dask擅长故障恢复,但我的工作有时永远不会结束。考虑一下,如果作业树中的任何内部节点被杀死,那么之前所有计算的临时中间结果都会丢失。计算应该从头开始。
Dask Future对象有复制,但我在更高级别的Dask Bag或Dataframe上找不到类似的功能来确保数据弹性。请告诉我是否有一种常见的处理方法可以在具有超高失败率的Dask集群中保持中间结果。
更新-我的解决方法
也许任何分布式计算系统都会遭受频繁的故障,即使系统可以从中恢复。在我的情况下,工作程序关闭本质上不是系统故障,而是由更高阶的进程触发的。因此,高级进程现在不再直接杀死我的工作人员,而是在开始运行时启动一个小的python脚本来发送retire_worker((命令。
如文档所示,通过retire_worker()
调度程序将数据从退休的工作人员移动到另一个可用的工作人员。所以我的问题暂时解决了。然而,我仍然保留这个问题,因为我认为复制、冗余计算将是一个更快的解决方案,并且可以更好地使用集群中的空闲节点。
这可能不是您想要的解决方案,但有一种选择是将任务序列划分为足够小的批次,以确保任务及时完成(或快速从头开始重新完成(。
也许是这样的:
import dask.bag as db
from toolz import partition_all
n_per_chunk = 100 # just a guess, the best number depends on the case
tasks = list(partition_all(n_per_chunk, my_ids))
results = []
for t in tasks:
summed_image = (
db
.from_sequence(my_ids)
.map(gen_image_from_ids)
.reduction(sum, sum)
.compute()
)
results.append(summed_image)
summed_image = sum(results) # final result
关于在失败时重新启动工作流(或可能并行启动较小的任务(,还有其他事情需要记住,但希望这能为您提供一个可行解决方案的起点。
更新:稍后会有更多的试验——这个答案并不理想,因为client.replicate()
命令正在阻塞。我怀疑它需要在制作复制品之前完成所有的期货——这是不需要的,因为1。任何中间节点都可以在所有节点就绪之前断开连接,以及2。它防止其他任务异步运行。我需要其他方法来制作复制品。
经过大量的试验,我找到了一种方法,可以在链式计算过程中复制中间结果,以实现数据冗余。请注意,并行reduction
功能是DaskBag
功能,它不直接支持replicate
功能。然而,正如Dask文档所述,可以复制低级DaskFuture
对象以提高恢复能力。
根据@SultanOrazbayev的帖子手动执行部分和,使用persist((函数将部分和保持在集群内存中,如注释中所示,返回的项目本质上是DaskFuture
:
import dask.bag as db
from dask.distributed import futures_of
from toolz import partition_all
n_per_chunk = 100 # just a guess, the best number depends on the case
tasks = list(partition_all(n_per_chunk, my_ids))
bags = []
for t in tasks:
summed_image = (
db
.from_sequence(my_ids)
.map(gen_image_from_ids)
.reduction(sum, sum)
.persist()
)
bags.append(summed_image)
futures = futures_of(bags) # This can only be called on the .persist() result
然后,我可以复制这些远程中间偏和,并对sum
期货感到更安全,以获得最终结果:
client.replicate(futures, 5) # Improve resiliency by replicating to 5 workers
summed_image = client.submit(sum, futures).result() # The only line that blocks for the final result
在这里,我觉得5的副本对我的集群来说是稳定的,尽管更高的值会导致更高的网络开销来在工作人员之间传递副本。
这是有效的,但可能会得到改进,比如如何对中间结果执行并行归约(求和(,尤其是当有很多任务时。请把你的建议留给我。