Kafka Consumer收到的旧消息很少(不是全部)(之前已经处理过)



我们有保留期设置为7天(168小时)的主题。消息是在生产者发送消息时实时消耗的。一切都按预期进行。然而,最近在生产服务器上,作为操作系统补丁的一部分,Devops意外地将时区从PST更改为EST。

在Kafka服务器重新启动后,我们看到消费者消费的旧消息很少(不是全部,而是随机的)。我们要求Devops将其更改回PST并重新启动。这个周末,旧消息再次出现。

我们还没有在较低的环境(开发、QA、阶段等)中看到这个问题。

Kafka版本:Kafka_2.12-0.11.0.2

非常感谢您的帮助。

正在添加更多信息。。。最近,我们的CentOS进行了补丁更新,不知何故,管理员从PST时区更改为EST,并启动了Kafka服务器。。。之后,我们的消费者开始看到偏移量为0的消息。调试后,我发现时区发生了变化,管理员在4天后从EST改回了PST。我们的消息生产者定期在时区更改前后发送消息。时区从EST改回PST后,Kafka服务器重新启动,我看到了下面的警告。

此日志发生在我们从EST返回到PST时:(server.log)〔2018-06-13 18:36:34430〕WARN发现一个损坏的索引文件,由于要求失败:找到损坏的索引,索引文件(/app/kafka_2.12-0.111.02/data/_consumer_offsets-21/0000000000000000 2076.index)的大小为非零,但最后一个偏移量为2076,不大于基准偏移量2076,/app/kafka_2.12-0.111.0.2/data/__consumer_offsets-21/0000000000000002076.index,/app/ka夫ka_2.12-0.11.0.2/data/__consumer_ooffsets-21/00000000000002076.txnindex和重建索引。。。(kafka.log.log)

在时区从EST更改回PST 3天后,我们重新启动了消费者,并再次开始看到偏移量为0的消费者消息。

As on Kafka v2.3.0您可以设置

"enable.auto.commit" : "true",// default is true as well
"auto.commit.interval.ms" : "1000"

这意味着每隔1秒,消费者就会将其Offset提交给Kafka,或者每次从指定的Topic中提取数据时,它都会提交最新的Offset。

因此,一旦您的Kafka Consumer启动并经过1秒,它就永远不会读取消费者收到并提交的消息。此设置不需要重新启动Kafka Server。

我认为这是因为您将在Commit新偏移之前重新启动程序。

管理偏移

对于每个使用者组,Kafka为正在被消费的每个分区维护已提交的偏移量。当使用者处理消息时,不会将其从分区中删除。相反,它只是使用一个称为提交偏移量的过程来更新其当前偏移量。

如果使用者在处理消息之后但在提交其偏移量之前失败,则提交的偏移量信息将不会反映消息的处理情况。这意味着该消息将由该组中要分配分区的下一个使用者再次处理。

自动提交偏移

提交偏移的最简单方法是让Kafka消费者自动执行。这很简单,但与手动提交相比,它提供的控制更少。默认情况下,使用者每5秒自动提交一次偏移。此默认提交每5秒发生一次,与使用者处理消息的进度无关。此外,当使用者调用poll()时,这也会导致从上一次对poll()的调用返回的最新偏移量被提交(因为它可能已被处理)。

如果提交的偏移量超过了对消息的处理,并且存在使用者故障,则可能无法处理某些消息。这是因为处理在提交的偏移量处重新启动,该偏移量晚于失败前要处理的最后一条消息。因此,如果可靠性比简单性更重要,那么通常最好手动提交偏移。

手动提交偏移

如果‍如果enable.auto.commit设置为false,则使用者手动提交其偏移量。它可以同步或异步地执行此操作。一种常见的模式是基于周期性计时器提交最新处理的消息的偏移量。这种模式意味着每个消息至少处理一次,但提交的偏移量永远不会超过正在处理的消息的进度。周期性定时器的频率控制消费者故障后可以重新处理的消息数量。当应用程序重新启动或组重新平衡时,将从上次保存的已提交偏移量中再次检索消息。

提交的偏移量是恢复处理的消息的偏移量。这通常是最近处理的消息的偏移量加一。

我认为这篇文章很有帮助。

最新更新