Kafka消费者在使用主题模式和大消息时不获取新记录



我希望你们中有人能帮助我。

我正在使用spring boot 2.3.4withspring kafka 2.5.6。我最近不得不重置偏移量,看到一些奇怪的行为。我们消耗了这些消息,但是在每个X(可变)消息之后,我们在继续消耗之前有10秒的超时。

这是我的配置:

spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
enable-auto-commit: false
auto-offset-reset: earliest
heartbeat-interval: 1000
max-poll-records: 50
group-id: kafka-fetch-demo
fetch-max-wait: 10000
listener:
type: single
concurrency: 1
poll-timeout: 1000
no-poll-threshold: 2
monitor-interval: 10
ack-mode: manual
producer: 
acks: all
batch-size: 0
retries: 0
这是一个示例侦听器代码:
@KafkaListener(id = LISTENER_ID, idIsGroup = false, topicPattern = "#{demoProperties.getTopicPattern()}")
public void onEvent(Acknowledgment acknowledgment, ConsumerRecord<byte[], String> record) {
log.info("Received record on topic {}, partition {} and offset {}",
record.topic(),
record.partition(),
record.offset());
acknowledgment.acknowledge();
}

我发现10秒超时来自fetch.max.wait.ms属性。然而,我不明白为什么这个属性适用。

据我所知,即使没有超过fetch.min.bytes,fetch-max-wait属性也只决定代理在向消费者提供新记录之前等待的最长时间。(在我的情况下,它被设置为默认的1,应该总是被填充)此外,我分析了这个问题只适用于使用主题模式和"更大"的模式。消息。

生殖

我在Github上上传了一个演示应用程序来重现这个问题:https://github.com/kraennix/kafka-fetch-demo.

我是如何复制它的:

  1. 我在kafka主题上放置了一千条消息,每条消息17.1 KB。
  2. 我启动了我的消费应用程序,它监听每个主题模式到这个主题。然后你可以看到这个停止行为。

注意:如果我对"small">

在日志中你可以看到成功的提交,但是接着它说跳过fetch

2021-01-16 15:04:40.773 DEBUG 19244 --- [_LISTENER-0-C-1] essageListenerContainer$ListenerConsumer : Commit list: {publish.LargeTopic.2.test-0=OffsetAndMetadata{offset=488, leaderEpoch=null, metadata=''}}
2021-01-16 15:04:40.773 DEBUG 19244 --- [_LISTENER-0-C-1] essageListenerContainer$ListenerConsumer : Committing: {publish.LargeTopic.2.test-0=OffsetAndMetadata{offset=488, leaderEpoch=null, metadata=''}}
2021-01-16 15:04:40.773 TRACE 19244 --- [_LISTENER-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Sending OffsetCommit request with {publish.LargeTopic.2.test-0=OffsetAndMetadata{offset=488, leaderEpoch=null, metadata=''}} to coordinator localhost:9092 (id: 2147483647 rack: null)
2021-01-16 15:04:40.773 DEBUG 19244 --- [_LISTENER-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Using older server API v7 to send OFFSET_COMMIT {group_id=kafka-fetch-demo,generation_id=4,member_id=consumer-kafka-fetch-demo-1-cf8e747f-531d-457a-aca8-18960c518ef9,group_instance_id=null,topics=[{name=publish.LargeTopic.2.test,partitions=[{partition_index=0,committed_offset=488,committed_leader_epoch=-1,committed_metadata=}]}]} with correlation id 62 to node 2147483647
2021-01-16 15:04:40.778 TRACE 19244 --- [_LISTENER-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Completed receive from node 2147483647 for OFFSET_COMMIT with correlation id 62, received {throttle_time_ms=0,topics=[{name=publish.LargeTopic.2.test,partitions=[{partition_index=0,error_code=0}]}]}
2021-01-16 15:04:40.779 DEBUG 19244 --- [_LISTENER-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Committed offset 488 for partition publish.LargeTopic.2.test-0
2021-01-16 15:04:40.779 TRACE 19244 --- [_LISTENER-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Skipping fetch for partition publish.LargeTopic.1.test-0 because previous request to localhost:9092 (id: 0 rack: null) has not been processed
2021-01-16 15:04:40.779 TRACE 19244 --- [_LISTENER-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Skipping fetch for partition publish.LargeTopic.2.test-0 because previous request to localhost:9092 (id: 0 rack: null) has not been processed
2021-01-16 15:04:40.779 TRACE 19244 --- [_LISTENER-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Skipping fetch for partition publish.LargeTopic.1.test-0 because previous request to localhost:9092 (id: 0 rack: null) has not been processed
2021-01-16 15:04:40.779 TRACE 19244 --- [_LISTENER-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Skipping fetch for partition publish.LargeTopic.2.test-0 because previous request to localhost:9092 (id: 0 rack: null) has not been processed

当消息的大小发生变化时,您可能需要更改以下2个Props心跳间隔:1000max-poll-records: 50

心跳间隔为1秒,最大轮询等待时间为10秒。如果消息的大小很大,并且您正在同一线程中处理已使用的消息,那么心跳检查将在下一次Pull触发时失败。确保Executor使用Callable处理消息。

当消息大小较大时,将心跳间隔增加到5到10秒,并将最大轮询记录减少到15条。希望,这对你有帮助

相关内容

最新更新