我正在开发一个Beam应用程序,该应用程序使用KafkaIO作为输入
KafkaIO.<Long, GenericRecord>read()
.withBootstrapServers("bootstrapServers")
.withTopic("topicName")
.withConsumerConfigUpdates(confs)
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer((Deserializer.class)
.commitOffsetsInFinalize()
.withoutMetadata();
我正在努力了解commitOffsetsInFinalize()
究竟是如何工作的。
如何完成流媒体作业?管道中的最后一步是自定义DoFn,它将消息写入DynamoDb
。是否有任何方法可以手动调用那里的某个finalize()
方法,以便在每次成功执行DoFn
之后提交偏移?
此外,我很难理解检查点和最终确定之间的关系是什么?如果管道上没有启用检查点,我还能完成并使commitOffsetsInFinalize()
工作吗?
p.s管道现在的情况是,即使使用commitOffsetsInFinalize()
,读取的每个消息,无论下游是否存在故障,都会被提交,从而导致数据丢失。
谢谢!
这里的finalize指的是检查点的finalization,换句话说,当数据已持久提交到Beam的运行时状态时(这样,将重试工作失败/重新分配,而无需再次从Kafka读取此消息(。这并不意味着有问题的数据已经通过了管道的其余部分。