Spark究竟如何在执行程序/任务之间重新洗牌RDD,以防执行失败或动态洗牌?



我正在考虑将一组抢占式实例添加到我在 Google Can Dataproc 上运行的 Spark 作业的 Worker 池中,但我试图了解如果其中一些实例被杀死,究竟会发生什么。我目睹了当Spark执行某种本机操作时会发生什么,例如SparkSQL等,并且似乎它设法保留了RDD的"弹性"元素。

但是,它究竟如何处理由自定义编写的函数(如.forEach().forEachPartition()(处理的RDD重新分发呢?如果正在处理此类任务的工人被杀死,究竟会发生什么?

具体来说,想象一下看起来像这样的.forEachPartition()Java 代码:

public void test(JavaRDD<String> RDD)
{
RDD.foreachPartition(new VoidFunction<Iterator<String>>(){
private static final long serialVersionUID = 1L;
@Override
public void call(Iterator<String> t) throws Exception
{
Queue<String> elementQ = new LinkedList<>();
while (t.hasNext())
elementQ.offer(t.next());
while(elementQ.size() >0)
{
String curElement = elementQ.remove();
System.out.println("Doing something with element " + curElement);
boolean condition = false;
if(condition)
elementQ.offer(curElement);
}
}});
}

当任务实例最初启动时,它会获取分配给它的RDD的所有元素,并将它们放入队列中。然后,它不断遍历这个队列 - 要么删除已处理的元素,要么根据某些内部逻辑将它们放回原处(如果它们必须等待稍后处理(。

如果运行这些任务之一的工人被杀,究竟会发生什么?最初分配给它的所有RDD元素是否会在其他工作线程的其他任务中重新分配?或者是否有以编程方式标记哪些元素已被"处理",哪些元素处于挂起状态?

我还注意到,如果使用纱线洗牌服务启用动态分配,Spark 在某些时候开始认为某些任务需要很长时间才能完成,并尝试在其他任务中重新分配RDD。这可能非常有用,但同样,这种重新分发究竟是如何发生的,有没有办法在.forEachPartition函数调用中以编程方式控制它?

Spark 中的动态资源分配不会在其他任务中重新分配 RDD - 它会根据工作负载扩展执行程序的数量。

关于使用抢占式实例,如果一个实例被抢占,该实例上的工作将丢失并重新分配给其他实例,这将阻碍作业进度。

相关内容