Kafka主题分区缺失偏移量



我有一个Flink流应用程序,它从一个有3个分区的Kafka主题中消费数据。尽管应用程序一直在运行和工作,没有任何明显的错误,但我在所有3个分区上的flink应用程序的消费者组中看到了延迟。

./kafka-consumer-groups.sh --bootstrap-server $URL --all-groups --describe

GROUP     TOPIC      PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
group-1   topic-test 0          9566            9568            2               -               -               -
group-1   topic-test 1          9672            9673            1               -               -               -
group-1   topic-test 2          9508            9509            1               -               -               -

如果我发送新记录,它们会被处理,但延迟仍然存在。我试图查看分区0的最后几条记录,这是我得到的(提交消息部分)-

./kafka-console-consumer.sh --topic topic-test --bootstrap-server $URL --property print.offset=true --partition 0 --offset 9560
Offset:9560
Offset:9561
Offset:9562
Offset:9563
Offset:9564
Offset:9565

日志结束偏移值为9568当前偏移量为9566。为什么这些补偿在主机用户中不可用,为什么存在这种延迟?

有几个例子,我注意到缺失的偏移量。例子——

Offset:2344
Offset:2345
Offset:2347
Offset:2348

为什么偏移量从2345跳到2347(跳过2346)?这是否与制作人针对主题的写作方式有关?

您可以为创建时添加的任何类型的配置描述主题。如果通过log.cleanup启用了日志压缩。Policy =compact,那么在运行时的行为将会有所不同。您可以看到这些延迟,由于日志压缩,延迟值设置或丢失的偏移量可能是由于使用键生成的消息但值为空。

配置日志清理工具

  • 日志清理器默认开启。这将启动cleaner线程池。要在特定主题上启用日志清理,请添加日志特定属性log.cleanup.policy=compact.

  • log.cleanup.policy属性是在代理服务器中定义的代理配置设置。属性文件;它影响集群中没有配置覆盖的所有主题。日志清理器可以配置为保留最少数量的未压缩的"头"。原木的。这可以通过设置压缩时间滞后log.cleaner.min. compression .lag.ms.

    来启用。
  • 这可以用来防止小于最小消息年龄的消息被压缩。如果没有设置,所有的日志段都有资格进行压缩,除了最后一个段,即当前正在写入的那个。即使活动段的所有消息都比最小压缩时间延迟早,也不会对其进行压缩。

  • 日志清理器可以配置为确保最大延迟,之后未压缩的"头";log.cleaner.max.compact .lag.ms.

延迟是基于Kafka消费者提交的最新偏移量计算的(lag=最新偏移量-最新偏移量提交)。一般来说,Flink在执行检查点时提交Kafka偏移量,所以如果使用消费者组命令检查它,总是会有一些延迟。

这并不意味着Flink没有消耗和处理主题/分区中的所有消息,它只是意味着它仍然没有提交它们。

最新更新