Kafka Stream-如何在自动提交关闭时处理错误



对于Kafka Streams,如果我们将commit.internal.ms配置设置为Long.MAX_VALUE,这将有效地避免Kafka Stream自动提交,但只有在调用较低级别的处理器api上下文commit((后才能提交,我们可以控制是否提交?如果是的话,那么在出现问题或消息包含格式错误的情况下,我们如何从主题中删除这些消息?

资源:https://docs.confluent.io/current/streams/index.html

您不能从主题中删除消息。。。Kafka主题是只追加不可变的日志。

还要注意,如果您提交了一条消息,它也不会从主题中删除。Kafka是而不是消息队列——提交偏移量只意味着跟踪日志中的当前进度(即偏移量(,但它不会删除任何消息。(你可以把它想象成一个"光标"。(

因此,如果消息格式不正确,并且您不想处理它,您可以跳过它。实际上,没有理由为这种情况设置大的提交间隔并进行手动提交。

相关内容

  • 没有找到相关文章

最新更新