如何使用恰好一次模式实现处理器 API



我正在研究Kafka Stream并使用处理器API来实现我的用例。下面的代码显示了 Process 方法,该方法在调用commit之前将消息转发到下游并中止。这会导致重新处理流,并在接收器上复制消息。

public void process(String key, String value) {
context.forward(key, value);
.. 
..
//killed
context.commit();
}

处理保证参数:

streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);

有没有办法仅在调用语句时应用转发commit。如果不是,实现恰好一次模式的正确方法是什么?

谢谢

确保接收器处于使用者模式read_committed以便它只会看到提交的消息。如果在事务中止之前将消息写入输出主题,则在中止时,消息仍然存在,只是未在提交时标记。第二次通过事务完成,以便将消息和提交标记添加到输出主题。如果您在未处于read_committed模式下阅读,则您将看到所有消息(包括未提交的消息(,并且它可能显示为重复,因为您看到中止的结果和提交的结果。

从 0.11 javadoc 这里 https://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

事务是在 Kafka 0.11.0 中引入的,其中应用程序可以 以原子方式写入多个主题和分区。为了这个 要正常工作,应配置从这些分区读取的使用者 仅读取已提交的数据。这可以通过设置 isolation.level=read_committed 在使用者的配置中。

在read_committed模式下,使用者将仅读取那些 已成功提交的事务性消息。它将 像以前一样继续阅读非事务性消息。没有 read_committed模式下的客户端缓冲。相反,结束偏移 的分区对于read_committed使用者将是 分区中属于打开事务的第一条消息。 此偏移称为"最后稳定偏移"(LSO(。

read_committed使用者只会读取 LSO 并过滤掉 任何已中止的事务性消息。伦敦交警官也 影响 seekToEnd(Collection( 的行为,并且 read_committed使用者的 endOffsets(Collection(,其详细信息 在每个方法的文档中。最后,抓取延迟指标是 还调整为相对于read_committed消费者的 LSO。 包含事务性消息的分区将包括提交或中止 指示交易结果的标记。有标记是 未返回到应用程序,但在日志中具有偏移量。作为一个 结果,应用程序从具有事务性消息的主题中读取 将看到消耗的偏移量中的间隙。这些丢失的消息将是 交易标记,它们在 两个隔离级别。此外,使用read_committed的应用程序 消费者也可能看到由于中止交易而导致的差距,因为这些 消息不会由使用者返回,但会 有效偏移。

你可能想把context.commit((包装在finally block下,以确保它被调用。但是,您还需要确保在成功处理后确实调用了它。

最新更新