我在我的项目中使用spring kafka,并实现了重试功能。消息被消耗,如果失败,我需要重试。下面的代码工作良好,如果我正在使用和做重试的一些例外。
@Component
public class Consumer implements AcknowledgingMessageListener<String, String> {
@Override
@KafkaListener(topics = { "ff808081672c17c8016730733d020001.CBS_PROFILE"})
public void onMessage(ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) {
log.info("Computed ");
throw new RuntimeException();
}
}
@Configuration
@Slf4j
public class KafkaConsumerConfig {
/** handles error and recovery logic **/
public DefaultErrorHandler errorHandler() {
System.out.println("Inside error handler");
// if these exceptions occur in the consumer, record will not be retried
List<Class<? extends Exception>> exceptionsToIgnore = List.of(JsonSchemaValidationException.class);
// if these exceptions occur in the consumer, record will be retried
List<Class<? extends Exception>> exceptionsToRetry = List.of(DataEngineNotAvailableException.class, ListenerExecutionFailedException.class);
// retry twice after fixed interval of one second
var fixedBackOff = new FixedBackOff(500L, 1);
var expBackoff = new ExponentialBackOffWithMaxRetries(2);
expBackoff.setInitialInterval(1000L);
var errorHandler = new DefaultErrorHandler(fixedBackOff);
errorHandler.setRetryListeners((consumerRecord, ex, deliveryAttempt) -> {
log.info("Failed record {} in retry listeners, Exception : {}, delivery attempt : {}", consumerRecord,
ex.getMessage(), deliveryAttempt);
});
exceptionsToIgnore.forEach(errorHandler::addNotRetryableExceptions);
exceptionsToRetry.forEach(errorHandler::addRetryableExceptions);
return errorHandler;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
// should be equal to number of partitions
factory.setConcurrency(3);
factory.setCommonErrorHandler(errorHandler());
return factory;
}
}
但是如果我像下面这样实现消费者,我就无法实现重试机制。期望重试dataProcessor.readRecord(r, consumer)
方法。
@PostConstruct
public void onLoad() {
configService.setConfigurations();
//load consumer
consumerRunnerV2.loadConsumers();
}
public void loadConsumers() {
consumerServiceV2.fillConsumerMap();
Map<String, LRConsumer> consumersMap = consumerServiceV2.getConsumersMap();
for (Map.Entry<String, LRConsumer> consumer : consumersMap.entrySet()) {
consumerTaskExecutor.execute(() -> runConsumer(consumer.getValue()));
}
}
private void runConsumer(LRConsumer consumer) {
while (true) {
// to check state of consumer
consumer = consumerServiceV2.getConsumerById(consumer.getConsumerId());
if (consumer.getState() != ConsumerState.PAUSED) {
log.info("Waiting to receive messages.");
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> r : records)
//expectation is to retry this method
dataProcessor.readRecord(r, consumer);
}
else {
Thread.sleep(1000);
log.info("Consumer is in paused state.");
}
}
}
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
这个当然不行;你完全绕过了Spring代码,自己与KafkaConsumer
交互。
在这种情况下,你将不得不实现你自己的错误处理。
但是,您不必使用@KafkaListener
来使用容器,您可以使用容器工厂来创建容器,并使用setMessageListener
将侦听器添加到容器属性中。
在这种情况下,你应该删除@KafkaListener
,这样框架就不会自动为它配置一个容器。