在Spark中的驱动程序上捕获Dataset foreachPartition()函数中抛出的异常



我正试图找到一种方法来捕获Spark在其驱动程序的foreachPartition((方法中引发的异常。在数据集上使用foreachPartition((时,我传递一个lambda函数,该函数在各个工作线程之间执行。此进程可能引发异常。但我想不出一种方法可以在司机身上发现这个异常。看来这是故意的。我能做些什么来改变这一点吗?下面是我目前正在做的一个例子:

public static void driverClassExecute() {
Dataset<ModelDTO> dataset = getSomeData();
dataset.foreachPartition(AClass::methodCanThrowException);
//How can I recover if the above throws an exception?
}
public static void methodCanThrowException(Iterator<ModelDTO> it) throws Exception {
//do stuff. If bad, throw exception. This crashes the driver.
throw new Exception("any exception");
}

我还使用Eclipse Oxygen IDE,以防编译器有问题。

在这种情况下,foreachPartition将抛出一个异常,因此您可以将该调用封装在try-catch中,并将其作为任何其他异常进行处理,尽管spark作业已经失败。为了避免作业失败,您必须在methodCanThrowException中处理异常。

我认为您的做法可能有点不同,但Spark分区DS的典型执行步骤如下:

  1. 创建数据集==>这已经通过getSomeData((函数完成了。

  2. 使用dataset.foreachPartition((==>在分区上迭代。请记住,这个函数不会给你一条记录,它会给你整个分区,根据你在getSomeData((中的拆分逻辑,这个分区可能有很多记录。

  3. 通过partition.forEachRemaining((进行下一级迭代,由执行器获取单个记录并运行适当的业务逻辑。

当你运行#3步骤时,你应该在这里传递每个记录你的方法CanThrowException((,就像在Java-中一样

partition.forEachRemaining(record -> {
try {
methodCanThrowException(record);
} catch (Exception e){
e.printStackTrace();
Logger.info(<record-key> "etc for debugging or tracking");
}
);

显然,您可以通过多种方式处理异常,以写入审核文件进行更正等。但您的处理不会因为处理方法中的异常而失败。

最新更新