SPARK:重新处理失败的记录,用新的代码更改来处理这些失败



我有Spark结构化的流式作业,像一样启用了检查点

df.writeStream .option("checkpointLocation", "s3://path/to/bucket/") .forEachWriter(customForEachWriterImp) .start()

foreachwriter是为跳过坏记录而设计的,我们正在构建一个基于AWS Cloudtrail的仪表板来跟踪跳过的坏记录,这样我们就可以进行必要的代码更改和重新部署,但由于这个坏记录的偏移量已经是检查点,Spark不会再次尝试从S3读取这个对象,尽管我们有新的代码来处理那些最初的坏数据。

原因是我们不想错过任何记录的处理,除非s3数据完全不好,否则我们甚至不会重新部署代码更改(并将其作为噪声忽略(。

因此,例如:在S3 json对象record1中,如果根据原始模式,字段a应该是integer,那么Spark的自定义ForEachWriter会将此记录作为坏记录失败,但从逻辑上讲,它不是坏记录,因此我们希望将代码修复为double来处理该字段,它也将适合原始integerdouble,因此我们将重新部署代码。

现在,当我们重新部署时,我们希望重新处理旧的基于double的坏记录,即使它的偏移量已经在S3中被检查点了。

Spark作业运行在亚马逊电子病历上,阅读亚马逊S3。

我知道在检查点后重新处理的唯一方法是在没有检查点的情况下运行,或者设置一个新的空检查点目录。这将重新处理所有内容。

最新更新