Spring Kafka 批处理错误处理程序 - 使用手动提交的反序列化器错误处理



我的服务在尝试处理 JSON 反序列化程序错误时陷入无限循环。我的服务正在使用manual_immediate确认模式,自动偏移重置为 false。我在主代码中使用acknowledge.acknowledge()提交批处理记录,但在批处理错误处理程序中,我无法为无效消息提交偏移量。我尝试了ConsumerAwareBatchErrorHandlerBatchErrorHandlerisAckAfterHandle()方法或consumer.commitSync()不起作用。

问题 1:需要了解确认批处理/提交偏移的过程。 问题 2:我获取的数据为空。我试图从数据(为 null)或抛出异常中读取原始消息,但失败了。

有人可以帮我提交偏移量并移动到下一批吗?我希望在死信或错误队列中插入失败的消息,然后继续下一批消息。

尝试过的代码:

@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMsConfig);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
return props;
}

@Bean
public DefaultKafkaConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer(LocationRecordDto.class));
}

@Bean(KAFKA_LISTENER)
public ConcurrentKafkaListenerContainerFactory<String, MyDTO> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, MYDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setBatchErrorHandler(new ConsumerAwareBatchErrorHandler() {
@Override
public void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer) {

if (thrownException instanceof SerializationException){
String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
String topics = s.split("-")[0];
int offset = Integer.valueOf(s.split("offset ")[1]);
int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);

TopicPartition topicPartition = new TopicPartition(topics, partition);
consumer.seek(topicPartition, offset + 1);
}
//Code to push data in error queue
//consumer.commitSync();
}

@Override
public boolean isAckAfterHandle() {
return true;
}
});
return factory;
}

您必须在侦听器而不是错误处理程序中处理反序列化异常,并正常提交批处理偏移量。

或者考虑改用新的 RecoveringBatchErrorHandler

最新更新