spring cloud stream kafka CommitFailedException



我使用spring-cloud-starter-stream-kafka 3.0.4。释放,我有以下错误:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:900)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:840)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:978)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:958)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:578)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:388)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:303)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1104)

配置:

spring:
cloud: 
stream:  
bindings:
default:
content-type: application/*+avro
inputAddAccount:
destination: dev_account
group: dev_group
ouputUpdateFile:
destination: dev_update_file
group: dev_group
producer:
useNativeEncoding: true
kafka:
binder:
brokers: kafka1-dev:6667
auto-create-topics: false
consumer-properties:
auto.offset.reset: latest
auto.commit.interval.ms: 1000
specific.avro.reader: true
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
schema.registry.url: https://schema-registry1.dev:8088
basic.auth.credentials.source: USER_INFO
basic.auth.user.info: reader:reader
producer-properties:
acks: -1
retries: 2147483647
max.in.flight.requests.per.connection : 1
request.timeout.ms: 10000
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url: https://schema-registry1.dev:8088
basic.auth.credentials.source: USER_INFO
basic.auth.user.info: reader:reader

org.apache.kafka.clients.consumer。CommitFailedException:提交无法完成,因为组已经重新平衡并将分区分配给另一个成员。这意味着对poll()的后续调用之间的时间长于配置的max.poll.interval.ms,这通常意味着轮询循环在消息处理上花费了太多时间。您可以通过增加max.poll.interval.ms或通过使用max.poll.records减少poll()中返回的批的最大大小来解决这个问题。

当您的消费者处理消息所需的时间超过session.timeout.ms或max.poll.interval.ms时,会发生上述异常。此外,查看您的配置,我没有看到这些属性,这意味着您的消费者正在使用默认值。请检查这些值,如果它小于处理记录的平均时间,请用更长的值覆盖它们。

相关内容

  • 没有找到相关文章

最新更新