卡夫卡消费者保持再平衡



即使没有处理任何源记录,我的Kafka Consumer客户端也会在频繁的轮询之间保持平衡。

此外,我在正确的位置实现了consumer.pause()consumer.resume(),以阻止使用者轮询,直到使用后端API隔离处理轮询的使用者记录数。不确定问题的原因。

消费者日志

2022-07-21 10:27:08; LOG_LEVEL="INFO"; SOURCE="MyKAFKAConsumer"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="**Polling...**"
2022-07-21 10:27:25; LOG_LEVEL="INFO"; SOURCE="MyKAFKAConsumer"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="**Polling...**"
2022-07-21 10:27:25; LOG_LEVEL="INFO"; SOURCE="org.apache.kafka.clients.consumer.internals.AbstractCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Group coordinator brokerHost:9093 (id: 2147444144 rack: null) is unavailable or invalid due to cause: session timed out without receiving a heartbeat response.isDisconnected: false. Rediscovery will be attempted."
2022-07-21 10:27:25; LOG_LEVEL="INFO"; SOURCE="org.apache.kafka.clients.consumer.internals.AbstractCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Discovered group coordinator brokerHost:9093 (id: 2147444144 rack: null)"
2022-07-21 10:27:25; LOG_LEVEL="INFO"; SOURCE="org.apache.kafka.clients.consumer.internals.AbstractCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Discovered group coordinator brokerHost:9093 (id: 2147444144 rack: null)"
2022-07-21 10:27:25; LOG_LEVEL="ERROR"; SOURCE="org.apache.kafka.clients.consumer.internals.ConsumerCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Offset commit failed on partition MySourceTopic-0 at offset 38572: The coordinator is not aware of this member."
2022-07-21 10:27:25; LOG_LEVEL="INFO"; SOURCE="org.apache.kafka.clients.consumer.internals.ConsumerCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] OffsetCommit failed with Generation{generationId=18661, memberId='MyKAFKAConsumerClientId1-d410deb4-d6f9-4f7e-b789-f866b89817e6', protocol='range'}: The coordinator is not aware of this member."
2022-07-21 10:27:25; LOG_LEVEL="WARN"; SOURCE="org.apache.kafka.clients.consumer.internals.ConsumerCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Asynchronous auto-commit of offsets {MySourceTopic-0=OffsetAndMetadata{offset=38572, leaderEpoch=132, metadata=''}, MySourceTopic-2=OffsetAndMetadata{offset=38566, leaderEpoch=122, metadata=''}, MySourceTopic-1=OffsetAndMetadata{offset=38779, leaderEpoch=118, metadata=''}, MySourceTopic-3=OffsetAndMetadata{offset=38585, leaderEpoch=121, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records."
2022-07-21 10:27:25; LOG_LEVEL="INFO"; SOURCE="org.apache.kafka.clients.consumer.internals.ConsumerCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Failing OffsetCommit request since the consumer is not part of an active group"
2022-07-21 10:27:25; LOG_LEVEL="INFO"; SOURCE="org.apache.kafka.clients.consumer.internals.AbstractCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Attempt to heartbeat with stale Generation{generationId=18661, memberId='MyKAFKAConsumerClientId1-d410deb4-d6f9-4f7e-b789-f866b89817e6', protocol='range'} and group instance id Optional.empty failed due to UNKNOWN_MEMBER_ID, ignoring the error"
2022-07-21 10:27:25; LOG_LEVEL="WARN"; SOURCE="org.apache.kafka.clients.consumer.internals.ConsumerCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Synchronous auto-commit of offsets {MySourceTopic-0=OffsetAndMetadata{offset=38572, leaderEpoch=132, metadata=''}, MySourceTopic-2=OffsetAndMetadata{offset=38566, leaderEpoch=122, metadata=''}, MySourceTopic-1=OffsetAndMetadata{offset=38779, leaderEpoch=118, metadata=''}, MySourceTopic-3=OffsetAndMetadata{offset=38585, leaderEpoch=121, metadata=''}} failed: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group."
2022-07-21 10:27:25; LOG_LEVEL="INFO"; SOURCE="org.apache.kafka.clients.consumer.internals.ConsumerCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group"
2022-07-21 10:27:25; LOG_LEVEL="INFO"; SOURCE="org.apache.kafka.clients.consumer.internals.ConsumerCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Lost previously assigned partitions MySourceTopic-0, MySourceTopic-2, MySourceTopic-1, MySourceTopic-3"
2022-07-21 10:27:25; LOG_LEVEL="INFO"; SOURCE="org.apache.kafka.clients.consumer.internals.AbstractCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] (Re-)joining group"
2022-07-21 10:27:25; LOG_LEVEL="INFO"; SOURCE="org.apache.kafka.clients.consumer.internals.AbstractCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] (Re-)joining group"
2022-07-21 10:27:26; LOG_LEVEL="INFO"; SOURCE="org.apache.kafka.clients.consumer.internals.AbstractCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Successfully joined group with generation Generation{generationId=18663, memberId='MyKAFKAConsumerClientId1-f55c7204-c932-40cd-a9b3-0351f9904026', protocol='range'}"
2022-07-21 10:27:26; LOG_LEVEL="INFO"; SOURCE="org.apache.kafka.clients.consumer.internals.AbstractCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Successfully synced group in generation Generation{generationId=18663, memberId='MyKAFKAConsumerClientId1-f55c7204-c932-40cd-a9b3-0351f9904026', protocol='range'}"
2022-07-21 10:27:26; LOG_LEVEL="INFO"; SOURCE="org.apache.kafka.clients.consumer.internals.ConsumerCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Notifying assignor about the new Assignment(partitions=[MySourceTopic-0, MySourceTopic-1, MySourceTopic-2, MySourceTopic-3])"
2022-07-21 10:27:26; LOG_LEVEL="INFO"; SOURCE="org.apache.kafka.clients.consumer.internals.ConsumerCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Adding newly assigned partitions: MySourceTopic-0, MySourceTopic-2, MySourceTopic-1, MySourceTopic-3"
2022-07-21 10:27:26; LOG_LEVEL="INFO"; SOURCE="org.apache.kafka.clients.consumer.internals.ConsumerCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Setting offset for partition MySourceTopic-0 to the committed offset FetchPosition{offset=38572, offsetEpoch=Optional[132], currentLeader=LeaderAndEpoch{leader=Optional[brokerHost:9093 (id: 33708 rack: null)], epoch=132}}"
2022-07-21 10:27:26; LOG_LEVEL="INFO"; SOURCE="org.apache.kafka.clients.consumer.internals.ConsumerCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Setting offset for partition MySourceTopic-2 to the committed offset FetchPosition{offset=38566, offsetEpoch=Optional[122], currentLeader=LeaderAndEpoch{leader=Optional[brokerHost:9093 (id: 39217 rack: null)], epoch=122}}"
2022-07-21 10:27:26; LOG_LEVEL="INFO"; SOURCE="org.apache.kafka.clients.consumer.internals.ConsumerCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Setting offset for partition MySourceTopic-1 to the committed offset FetchPosition{offset=38779, offsetEpoch=Optional[118], currentLeader=LeaderAndEpoch{leader=Optional[brokerHost:9093 (id: 30678 rack: null)], epoch=118}}"
2022-07-21 10:27:26; LOG_LEVEL="INFO"; SOURCE="org.apache.kafka.clients.consumer.internals.ConsumerCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Setting offset for partition MySourceTopic-3 to the committed offset FetchPosition{offset=38585, offsetEpoch=Optional[121], currentLeader=LeaderAndEpoch{leader=Optional[brokerHost:9093 (id: 38300 rack: null)], epoch=121}}"
2022-07-21 10:27:30; LOG_LEVEL="INFO"; SOURCE="MyKAFKAConsumer"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="**Polling...**"
2022-07-21 10:27:35; LOG_LEVEL="INFO"; SOURCE="MyKAFKAConsumer"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="**Polling...**"

消费者配置

auto.commit.interval.ms = 5000
auto.offset.reset = earliest
enable.auto.commit = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
heartbeat.interval.ms = 3000
max.poll.interval.ms = 300000
max.poll.records = 100

消费者轮询((实现

while (true) {
try {
ConsumerRecords<String, GenericRecord> records = null;
LOGGER.info("Polling...{}");
records = consumer.poll(Duration.ofSeconds(5));
if (records != null && records.count() > 0) {
consumer.pause(consumer.assignment());
recordExecutor.execute(records);
}
consumer.resume(consumer.assignment());
consumer.commitSync();
} catch (SerializationException e) {
continue;
} catch (Exception e) {
continue;
}
}

提前谢谢。

这里有几点我想提一下。

  1. 对于您的用例,理想情况下不应该使用consumer.pause()consumer.resume()。如果没有记录,那么如果没有可用记录,consumer.poll()将在其参数中指定的持续时间内阻塞。因此,它不应该导致任何高CPU使用率,因为它不是一个非阻塞调用

来自consumer.poll((的文档

如果有可用的记录,此方法将立即返回。否则,它将等待经过的超时。如果超时到期,将返回一个空的记录集。

  1. max.poll.interval.ms与您应该在consumer.poll()中传递的内容不同。如果两者相同,consumer.poll()就不会有这个参数。max.poll.interval.ms是轮询调用之间的最大延迟。

    因此,当没有记录时,您将等待max.poll.interval.ms,直到下一个poll()调用。由于下一个poll((没有在此超时内进行,因此消费者被认为是失败的。

来自max.poll.interval.ms:的文档

如果在超时到期之前没有调用poll((,则消费者被认为是失败的,集团将重新平衡以便将分区重新分配给另一个成员

  1. 此外,您似乎正在使用commitSync(),并且您的配置是enable.auto.commit=true。理想情况下,您应该使用它们中的任何一个。

  2. 如果不想抛出异常,请将其记录在catch块中。

经过广泛的分析,我们已经确定了这些频繁再平衡的根本原因,这是由于消费者/流媒体和集团协调员(Leader Broker(之间的会话超时。由于托管这些使用者/流媒体的服务器容量问题(100%CPU利用率(,会话超时一直在发生。当CPU饱和到100%时,所有这些KAFKA客户端都进入无响应状态,最终会话超时。

事件日志

group-MyKAFKAConsumeGroupId1]组协调器brokerHost:9093(id:21474 rack:null(不可用或无效,原因是:会话超时而未接收到检测信号响应。isDisconnected:false。将尝试重新发现。

我在一个应用程序中遇到了类似的问题。正如你从日志中看到的,错误是

2022-07-21 10:27:25;LOG_ LEVEL=";INFO";;SOURCE=";org.apache.kafka.clients.consumer.internals.AbstractCoordinator";;平台=";Retail_Products";;SERVICE=";MyKAFKAC消费者";;EVENT_MESSAGE=";[消费者客户端Id=MyKAFKAConsumerClientId1,groupId=MyKAFKAConsumeGroupId1]组协调器brokerHost:9093(id:2147444144机架:null(不可用或无效,原因是:会话超时而未接收到检测信号响应。已断开连接:false。将尝试重新发现">

当您在Kafka使用者上达到会话超时时,即组协调器未在指定的时间限制内接收到来自Kafka消费者心跳线程的心跳时,会发生此错误(奇怪的是,Kafka使用者将此记录在"INFO"下(。在java中,决定这一点的config属性是"session.timeout.ms"。最终,kafka消息的提交将失败,您的应用程序将经历重新平衡。我在申请中遇到了这个问题,这是一个令人讨厌的问题。

有趣的是,为什么我们在应用程序中一开始就面临这个错误。我们使用Java飞行记录器对应用程序进行了内存分析,并注意到在会话超时发生的时候,总会有几个长的GC暂停。此外,我们在应用程序中使用了旧版本的kafkaapi,当时默认的会话超时属性只有10秒,现在他们在最近的版本中将其增加到了45秒。

一旦我将session.timeout.ms属性更改为45秒,这个错误就消失了。

最新更新