组中的成员失败触发Kafka中的常量(Re-)加入组



我的应用程序正在从kafka中读取一个主题,在丰富它之后,它将它保存到另一个主题。CCD_ 1被配置为CCD_。我有两个经纪人和12个分区。

Topic: enriched-request PartitionCount: 12  ReplicationFactor: 2    Configs: min.insync.replicas=1,flush.ms=86400000,segment.bytes=1073741824,flush.messages=1073741824,max.message.bytes=1000000,index.interval.bytes=4096,unclean.leader.election.enable=false,retention.bytes=-1,delete.retention.ms=259200000,segment.ms=604800000,segment.index.bytes=10485760
Topic: enriched-request Partition: 0    Leader: 7   Replicas: 7,8   Isr: 7,8
Topic: enriched-request Partition: 1    Leader: 8   Replicas: 8,7   Isr: 7,8
Topic: enriched-request Partition: 2    Leader: 7   Replicas: 7,8   Isr: 7,8
Topic: enriched-request Partition: 3    Leader: 8   Replicas: 8,7   Isr: 7,8
Topic: enriched-request Partition: 4    Leader: 7   Replicas: 7,8   Isr: 7,8
Topic: enriched-request Partition: 5    Leader: 8   Replicas: 8,7   Isr: 7,8
Topic: enriched-request Partition: 6    Leader: 7   Replicas: 7,8   Isr: 7,8
Topic: enriched-request Partition: 7    Leader: 8   Replicas: 8,7   Isr: 7,8
Topic: enriched-request Partition: 8    Leader: 7   Replicas: 7,8   Isr: 7,8
Topic: enriched-request Partition: 9    Leader: 8   Replicas: 8,7   Isr: 7,8
Topic: enriched-request Partition: 10   Leader: 7   Replicas: 7,8   Isr: 7,8
Topic: enriched-request Partition: 11   Leader: 8   Replicas: 8,7   Isr: 7,8

在我的测试环境中两周后,我收到了日志消息:INFO AbstractCoordinator:336 - [Consumer clientId=my-enrichments-client-StreamThread-4-consumer, groupId=my-enrichments] (Re-)joining group和这8个线程。每5分钟就会触发一次,每次都会触发一组新的8个线程。

在我的kafka.log中,我看到:CCD_ 5。与上面的内容相同,每5分钟就会删除一组新的8个线程。

只有一个应用程序在我的测试环境中运行。我试着等待15-20分钟来重新部署,但总是出现同样的错误。有人知道我如何在不必更改StreamsConfig.CLIENT_ID_CONFIGStreamsConfig.APPLICATION_ID_CONFIG的情况下解决它吗?

消费者配置

final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-enrichments");
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "my-enrichments-client");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, my bootstrap sersvers);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 8);
streamsConfiguration.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

如果说它有什么帮助的话,那就是在我将线程数从3增加到8一周左右后开始发生的。我不知道它是否与此有关。

由于找不到答案,我尝试了以下方法:

  1. 描述小组。CCD_ 10。有了这个,我得到了警告StreamsConfig.NUM_STREAM_THREADS_CONFIG1
  2. 删除它。bin/kafka-consumer-groups.sh --bootstrap-server my-kafka-server:9092 --delete --group my-enrichments这导致了错误:
Error: Deletion of some consumer groups failed:
* Group 'my-enrichments' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.
  1. 重新启动代理修复了问题

最新更新