Kafka Streams 在生产者抛出异常时提交偏移量



在我的Kafka流应用程序中,我有一个处理器,计划每60秒生成一次输出消息。输出消息是根据来自单个输入主题的消息构建的。有时输出消息大于代理上配置的限制(默认为 1MB(。引发异常,应用程序关闭。提交间隔设置为默认值 (60s(。

在这种情况下,我希望在下次运行时,在崩溃前的 60 年代使用的所有消息都将被重新使用。但实际上,这些消息的偏移量是提交的,并且在下次运行时不会再次处理这些消息。

阅读类似问题的答案,在我看来,不应该提交偏移量。当我将提交间隔增加到 120 秒(处理器仍然每 60 秒标点一次(时,它会按预期工作并且不会提交偏移量。

我正在使用默认处理保证,但我也尝试过exactly_once。两者具有相同的结果。从处理器调用context.commit()似乎对问题没有影响。

我在这里做错了什么吗?

Kafka Streams 中Processor的合约是,在返回之前,您已经完全处理了输入记录并forward()所有相应的输出消息process()。 -- 这个合约意味着 Kafka Streams 被允许在process()返回后提交相应的偏移量。

您似乎process()内存中"缓冲"消息,以便稍后发出它们。这违反了本合同。如果要"缓冲"消息,则应将状态存储附加到Processor并将所有这些消息放入存储中(参见 https://kafka.apache.org/25/documentation/streams/developer-guide/processor-api.html#state-stores(。该存储由Kafka Streams为您管理,并且具有容错能力。这样,在发生错误后,状态将被恢复,并且您不会丢失任何数据(即使输入消息未重新处理(。

我怀疑将提交间隔设置为 120 秒是否真的在所有情况下都按预期工作,因为提交发生和调用标点符号之间没有对齐。

其中一些将取决于您使用的客户端以及它是否基于 librdkafka。 一些答案还取决于您如何"循环"轮询"方法。一个典型的示例类似于"自动偏移提交"下的代码,https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html 但这假设轮询循环相当快(100ms + 处理时间(和 1000ms 的auto.commit.timeout.ms(默认值通常为 5000ms(。

如果我没看错你的问题,你似乎每 60 秒消耗一次消息?

需要注意的是,kafka 客户端的行为与调用poll的频率密切相关(一些库会将轮询包装在类似"Consumption"方法中的东西中(。频繁进行民意调查对于在经纪人看来"活着"很重要。如果您不至少每max.poll.interval.ms轮询一次(默认 5 分钟(,您将收到其他异常。 这可能导致客户被踢出他们的消费群体。

无论如何,切中要害...auto.commit.interval.ms只是一个最大值。如果消息已被接受/确认或已使用 StoreOffset,则在轮询时,客户端可以决定更新代理上的偏移量。可能是由于客户端缓冲区大小被命中或其他一些语义。

另一件要看的事情(特别是如果使用基于 librdkafka 的客户端,其他人也有类似的东西(是enable.auto.offset.store(默认为 true(,这将"自动存储提供给应用程序的最后一个消息的偏移量",因此每次轮询/使用来自客户端的消息时,它都会 StoreOffset。如果您还使用 auto.commit ,那么您的偏移量可能会以您意想不到的方式移动。

有关 librdkafka 的完整配置集,请参见 https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md。

有很多/许多消费/确认的方式。我认为对于您的情况,配置页面上max.poll.interval.ms的评论可能是相关的。

" 注意:建议为长时间处理的应用程序设置 enable.auto.offset.store=false,然后在消息处理后显式存储偏移量(使用 offsets_store((( ">

抱歉,这个"答案"有点啰嗦。我希望有一些线索供你拉动。

最新更新