对于Kafka Streams,如果我们将commit.internal.ms配置设置为Long.MAX_VALUE,这将有效地避免Kafka Stream自动提交,但只有在调用较低级别的处理器api上下文commit((后才能提交,我们可以控制是否提交?如果是的话,那么在出现问题或消息包含格式错误的情况下,我们如何从主题中删除这些消息?
资源:https://docs.confluent.io/current/streams/index.html
您不能从主题中删除消息。。。Kafka主题是只追加不可变的日志。
还要注意,如果您提交了一条消息,它也不会从主题中删除。Kafka是而不是消息队列——提交偏移量只意味着跟踪日志中的当前进度(即偏移量(,但它不会删除任何消息。(你可以把它想象成一个"光标"。(
因此,如果消息格式不正确,并且您不想处理它,您可以跳过它。实际上,没有理由为这种情况设置大的提交间隔并进行手动提交。