偶尔,我的kafka流应用程序会因以下错误而终止:
[-StreamThread-4] o.a.k.s.p.i.AssignedStreamsTasks : Failed to commit stream task 0_9 due to the
following error:
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before successfully
committing offsets {my-topic-9=OffsetAndMetadata{offset=5840887122, leaderEpoch=null, metadata=''}}
根据文件,我认为60000ms源自该属性:default.api.timeout.ms。所以我可能只需要增加这个超时时间。但我还有什么其他选择?
我的应用程序运行时有处理保证:exactly_once,为此,我在文档中找到了以下内容:
commit.interval.ms:保存处理器。(注意,如果processing.guarantee被设置为exactly_once,默认值为100,否则默认值为30000。
所以在我的情况下提交间隔很低。为什么它必须如此之低才能准确地显示?我可以增加间隔以减少提交次数,从而缓解这种情况吗?
我还有什么其他选择?
增加超时当然是一种选择。实际上,为了使Kafka Streams对超时异常更有弹性,正在进行一些工作:https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+改进+timeouts+和+retries+in+Kafka+Streams
关于commit.interval.ms
:它被设置为较低,以保持应用程序的端到端延迟较低。只要事务挂起,下游消费者(在"read_committed"
模式下(就无法消费数据,因此在提交事务之前会经历额外的延迟。对于可能有多个重新分区步骤的Kafka Streams应用程序,必须经常提交以保持低延迟。
因此,根据您的延迟要求,您可能无法增加提交间隔。