如何设置foreachBatch的batchId的起点



我面临的问题是,我的流程依赖foreachBatch的batchId来控制准备进入管道第二阶段的内容。因此,只有在第一阶段(批次(完成的情况下,它才会进入第二阶段。

我想保证,如果出现问题,流可以从停止的地方继续。

我们试图通过将所有完成的批添加到delta表来进行一些控制,但是,我找不到设置初始批ID的方法。

试图从您提供的信息中进行分析。可以使用某种自定义检查点。对于每个批次,存储具有批次id和状态列的偏移范围。继续将状态更新为RUNNING/COMPLETED。

如果出现问题,请检查最后一个批处理状态。如果没有完成,请从该偏移量开始,否则从增量偏移量开始。

我想保证,如果出现问题,流可以从停止的地方继续。

这是foreachBatch接收器的checkpointLocation选项,在出现问题时用作预写日志(WAL(。

引用官方文件:

最后,系统通过检查点和预写日志确保端到端的一次容错保证。

然后它在使用检查点从故障中恢复:中说

如果出现故障或故意关闭,您可以恢复上一个查询的上一个进度和状态,并在它停止的地方继续。这是使用检查点和预写日志完成的。您可以配置具有检查点位置的查询,该查询将把所有进度信息(即每个触发器中处理的偏移量范围(和正在运行的聚合(例如快速示例中的字数(保存到检查点位置。该检查点位置必须是HDFS兼容文件系统中的路径,并且可以在启动查询时设置为DataStreamWriter中的一个选项。

我认为这正好涵盖了您的用例。


我找不到设置初始批次Id的方法。

这需要在流式查询的checkpointLocation选项中使用具有预期批处理ID的预填充目录。

您可以简单地自己创建必要的文件,并让恢复的流式查询从目录开始。

(我以前从未尝试过,但看起来可行(。

最新更新