spring-kafka消费者批处理错误与spring启动版本2.3.7



我正在尝试执行spring kafka批处理错误处理。首先我有几个问题。

  1. 侦听器和容器错误处理程序的区别是什么?这两类错误包括哪些错误?

  2. 你能帮助一些样品更好地理解这个吗?

这是我们的设计:

  1. 每隔一定时间轮询一次
  2. 以批处理方式使用消息
  3. 基于键推送到本地缓存(应用程序缓存)(以避免重复事件)
  4. 批处理完成后,将所有值逐一推送到另一个主题。
  5. 在操作3完成后清除缓存并手动确认偏移量。

下面是我的错误处理计划:

public ConcurrentKafkaListenerContainerFactory<String, String> myListenerPartitionContainerFactory(String groupId) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(groupId));
factory.setConcurrency(partionCount);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.getContainerProperties().setIdleBetweenPolls(pollInterval);
factory.setBatchListener(true);
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> myPartitionsListenerContainerFactory() 
{
return myListenerPartitionContainerFactory(groupIdPO);
}

@Bean
public RecoveringBatchErrorHandler(KafkaTemplate<String, String> errorKafkaTemplate) {
DeadLetterPublishingRecoverer recoverer =
new DeadLetterPublishingRecoverer(errorKakfaTemplate);
RecoveringBatchErrorHandler errorHandler =
new RecoveringBatchErrorHandler(recoverer, new FixedBackOff(2L, 5000)); // push error event to the error topic
}

@KafkaListener(id = "mylistener", topics = "someTopic", containerFactory = "myPartitionsListenerContainerFactory"))
public void listen(List<ConsumerRecord<String, String>> records, @Header(KafkaHeaders.MESSAGE_KEY) String key, Acknowledgement ack) {
Map hashmap = new Hashmap<>();
records.forEach(record -> {
try {
//key will be formed based on the input record - it will be id.
hashmap.put(key, record);  
}
catch (Exception e) {
throw new BatchListenerFailedException("Failed to process", record);
}

});
// Once success each messages to another topic.
try {
hashmap.forEach( (key,value) -> {  push to another topic })
hashmap.clear();
ack.acknowledge();
} catch(Exception ex) {
//handle producer exceptions
}
}

方向好还是需要改进?以及需要实现哪种类型的容器和侦听器处理程序?

@Gary罗素……你能帮我一下吗?

侦听器错误处理程序用于请求/应答情况,在这种情况下错误处理程序可以向发送方返回有意义的应答。

你需要抛出一个异常来触发容器错误处理程序,你需要在原始批处理的索引中知道哪条记录失败。

如果你使用这样的手动ack,你可以使用nack()方法来指示哪个记录失败了(在这种情况下不要抛出异常)。

最新更新