我有一个Flink流应用程序,它从一个有3个分区的Kafka主题中消费数据。尽管应用程序一直在运行和工作,没有任何明显的错误,但我在所有3个分区上的flink应用程序的消费者组中看到了延迟。
./kafka-consumer-groups.sh --bootstrap-server $URL --all-groups --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
group-1 topic-test 0 9566 9568 2 - - -
group-1 topic-test 1 9672 9673 1 - - -
group-1 topic-test 2 9508 9509 1 - - -
如果我发送新记录,它们会被处理,但延迟仍然存在。我试图查看分区0的最后几条记录,这是我得到的(提交消息部分)-
./kafka-console-consumer.sh --topic topic-test --bootstrap-server $URL --property print.offset=true --partition 0 --offset 9560
Offset:9560
Offset:9561
Offset:9562
Offset:9563
Offset:9564
Offset:9565
日志结束偏移值为9568和当前偏移量为9566。为什么这些补偿在主机用户中不可用,为什么存在这种延迟?
有几个例子,我注意到缺失的偏移量。例子——
Offset:2344
Offset:2345
Offset:2347
Offset:2348
为什么偏移量从2345跳到2347(跳过2346)?这是否与制作人针对主题的写作方式有关?
您可以为创建时添加的任何类型的配置描述主题。如果通过log.cleanup启用了日志压缩。Policy =compact,那么在运行时的行为将会有所不同。您可以看到这些延迟,由于日志压缩,延迟值设置或丢失的偏移量可能是由于使用键生成的消息但值为空。
配置日志清理工具
-
日志清理器默认开启。这将启动cleaner线程池。要在特定主题上启用日志清理,请添加日志特定属性log.cleanup.policy=compact.
-
log.cleanup.policy属性是在代理服务器中定义的代理配置设置。属性文件;它影响集群中没有配置覆盖的所有主题。日志清理器可以配置为保留最少数量的未压缩的"头"。原木的。这可以通过设置压缩时间滞后log.cleaner.min. compression .lag.ms.
来启用。 -
这可以用来防止小于最小消息年龄的消息被压缩。如果没有设置,所有的日志段都有资格进行压缩,除了最后一个段,即当前正在写入的那个。即使活动段的所有消息都比最小压缩时间延迟早,也不会对其进行压缩。
-
日志清理器可以配置为确保最大延迟,之后未压缩的"头";log.cleaner.max.compact .lag.ms.
延迟是基于Kafka消费者提交的最新偏移量计算的(lag=最新偏移量-最新偏移量提交)。一般来说,Flink在执行检查点时提交Kafka偏移量,所以如果使用消费者组命令检查它,总是会有一些延迟。
这并不意味着Flink没有消耗和处理主题/分区中的所有消息,它只是意味着它仍然没有提交它们。