即使未能发布到 Kafka 流中的输出主题,是否也会提交消费者偏移量?



如果我有一个 Kafka 流应用程序无法发布到主题(因为该主题不存在),它是提交消费者偏移量并继续,还是会循环相同的消息直到它可以解析输出主题?该应用程序仅打印错误,否则从我观察到的情况来看运行良好。

尝试发布到主题时的错误示例:

Error while fetching metadata with correlation id 80 : {super.cool.test.topic=UNKNOWN_TOPIC_OR_PARTITION}

在我看来,它只会在相同的消息上旋转,直到问题得到解决,以免丢失数据?我找不到关于默认行为是什么的明确答案。我们没有将自动提交设置为关闭或类似的东西,大多数设置都设置为默认值。

我问是因为我们不想最终陷入运行状况检查正常的情况(应用程序在打印错误以记录时正在运行),我们只是扔掉了大量的 Kafka 消息。

Kafka Streams 不会为这种情况提交偏移量,因为它提供了至少一次的处理保证(事实上,甚至不可能以不同的方式重新配置 Kafka Streams - 只有更强的恰好一次保证是可能的)。此外,Kafka Streams 始终禁用使用者的自动提交(并且不允许您启用它),因为 Kafka Streams 管理提交偏移量本身。

如果使用默认设置运行,生产者实际上应该抛出异常,相应的线程应亡——如果线程死亡,您可以通过注册KafkaStreams#uncaughtExceptionHandler()来获得回调。

您还可以观察KafkaStreams#state()(或注册回调KafkaStreams#setStateListener())。如果所有线程都死了,状态将变为DEAD(请注意,旧版本中存在一个错误,在这种情况下,状态仍然RUNNING:https://issues.apache.org/jira/browse/KAFKA-5372)

因此,应用程序不应处于正常状态,Kafka Streams 不会重试输入消息,而是停止处理,您需要重新启动客户端。重新启动时,它将重新读取失败的输入消息,并重新尝试写入输出主题。

如果希望 Kafka 流重试,则需要增加创建者配置reties以避免生成器引发异常并在内部重试写入。如果生产者写入缓冲区已满,这可能会"阻止"进一步处理。

最新更新