@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> dcmContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(**consumerEventFactory()**);
Map<String, String> micrometerTags = new HashMap<>();
micrometerTags.put(KafkaCommonConfig.CONSUMER_TAG, TAG_VALUE);
factory.getContainerProperties().setMicrometerTags(micrometerTags);
return factory;
}
consumerEventFactory ()在上面是从下面调用的:
@Bean
public ConsumerFactory<String, String> consumerEventFactory() {
Map<String, Object> config = new HashMap<>();
config.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
CCMReader.getCcmDcmConsumerConfig().getBootStrapServers());
config.put(
ConsumerConfig.GROUP_ID_CONFIG, "randommmmm");
config.put(
ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
CCMReader.getCcmDcmConsumerConfig().getMaxPollIntervalMs());
config.put(
ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
CCMReader.getCcmDcmConsumerConfig().getMaxPollRecords());
config.put(
ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,
CCMReader.getCcmDcmConsumerConfig().getRequestTimeOutMs());
config.put(
ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,
CCMReader.getCcmDcmConsumerConfig().getHeartbeatIntervalMs());
config.put(
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
"true");
config.put(
ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
CCMReader.getCcmDcmConsumerConfig().getAutoCommitIntervalMs());
config.put(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return new DefaultKafkaConsumerFactory<>(
config, new StringDeserializer(), new StringDeserializer());
}
由于没有错误处理程序,因此应该拒绝所有异常。然而,异常事件被一次又一次地调用(无限次)。不知道发生了什么…目前唯一的结论就是黑魔法。请帮助!
异常发生时发生了什么?消费者进程是否正在重新启动?
Since there is no error handler, all the exceptions should be rejected.
-不,我不这么认为。
在process方法中没有处理异常时,Kafka消费者退出是正常的。因此,一旦消费者再次启动(您的docker的重启策略可能是),它开始读取相同的事件,因为这些事件尚未从该客户端id提交回代理为成功。
我不确定Spring Kafka,但我觉得你应该定义异常处理,以确保这些事件被正确处理,以便它们作为正常事件提交,而不是杀死消费者并进入无限循环。
由于没有错误处理程序,所以…
默认的错误处理程序(2.8及更高版本)是DefaultErrorHandler
(早期版本为SeekToCurrentErrorHandler
)。
这两个错误处理程序将重试9次。