我的应用程序正在从kafka中读取一个主题,在丰富它之后,它将它保存到另一个主题。CCD_ 1被配置为CCD_。我有两个经纪人和12个分区。
Topic: enriched-request PartitionCount: 12 ReplicationFactor: 2 Configs: min.insync.replicas=1,flush.ms=86400000,segment.bytes=1073741824,flush.messages=1073741824,max.message.bytes=1000000,index.interval.bytes=4096,unclean.leader.election.enable=false,retention.bytes=-1,delete.retention.ms=259200000,segment.ms=604800000,segment.index.bytes=10485760
Topic: enriched-request Partition: 0 Leader: 7 Replicas: 7,8 Isr: 7,8
Topic: enriched-request Partition: 1 Leader: 8 Replicas: 8,7 Isr: 7,8
Topic: enriched-request Partition: 2 Leader: 7 Replicas: 7,8 Isr: 7,8
Topic: enriched-request Partition: 3 Leader: 8 Replicas: 8,7 Isr: 7,8
Topic: enriched-request Partition: 4 Leader: 7 Replicas: 7,8 Isr: 7,8
Topic: enriched-request Partition: 5 Leader: 8 Replicas: 8,7 Isr: 7,8
Topic: enriched-request Partition: 6 Leader: 7 Replicas: 7,8 Isr: 7,8
Topic: enriched-request Partition: 7 Leader: 8 Replicas: 8,7 Isr: 7,8
Topic: enriched-request Partition: 8 Leader: 7 Replicas: 7,8 Isr: 7,8
Topic: enriched-request Partition: 9 Leader: 8 Replicas: 8,7 Isr: 7,8
Topic: enriched-request Partition: 10 Leader: 7 Replicas: 7,8 Isr: 7,8
Topic: enriched-request Partition: 11 Leader: 8 Replicas: 8,7 Isr: 7,8
在我的测试环境中两周后,我收到了日志消息:INFO AbstractCoordinator:336 - [Consumer clientId=my-enrichments-client-StreamThread-4-consumer, groupId=my-enrichments] (Re-)joining group
和这8个线程。每5分钟就会触发一次,每次都会触发一组新的8个线程。
在我的kafka.log
中,我看到:CCD_ 5。与上面的内容相同,每5分钟就会删除一组新的8个线程。
只有一个应用程序在我的测试环境中运行。我试着等待15-20分钟来重新部署,但总是出现同样的错误。有人知道我如何在不必更改StreamsConfig.CLIENT_ID_CONFIG
或StreamsConfig.APPLICATION_ID_CONFIG
的情况下解决它吗?
消费者配置
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-enrichments");
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "my-enrichments-client");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, my bootstrap sersvers);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 8);
streamsConfiguration.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
如果说它有什么帮助的话,那就是在我将线程数从3
增加到8
一周左右后开始发生的。我不知道它是否与此有关。
由于找不到答案,我尝试了以下方法:
- 描述小组。CCD_ 10。有了这个,我得到了警告
StreamsConfig.NUM_STREAM_THREADS_CONFIG
1 - 删除它。
bin/kafka-consumer-groups.sh --bootstrap-server my-kafka-server:9092 --delete --group my-enrichments
这导致了错误:
Error: Deletion of some consumer groups failed:
* Group 'my-enrichments' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.
- 重新启动代理修复了问题