我们使用的是kafka经纪人2.10和kafka java驱动程序2.0.1和kafka流驱动程序2.0.1。
我们正在使用ChangElog恢复状态,大约需要80-120分钟。与此同时,流媒体消费者的来源主题陷入了重新平衡。成功恢复州后,来源主题消费者组被困在重新平衡
中Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, ENDPOINT);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, busName);
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(StreamsConfig.CLIENT_ID_CONFIG, "CLMB");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
config.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2);
config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 10);
config.put(StreamsConfig.STATE_DIR_CONFIG, STATE_DIR + "/streams");
config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 40_000);
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 80_000);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 40000);
config.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 256 * 1024 * 1024);
config.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32 * 1024 * 1024);
config.put(ConsumerConfig.CLIENT_ID_CONFIG, ENDPOINT);
config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,Collections.singletonList(StickyAssignor.class));
config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 120 * 1000);
config.put(ProducerConfig.BATCH_SIZE_CONFIG,5000);
config.put(ProducerConfig.RETRIES_CONFIG,2);
config.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,200L);
您可以检查kafka server.log上的任何收缩/扩展。
也许您正在击中此错误,在这种情况下,您可以升级到2.2(或更高(,看看是否已修复。