调用提交偏移量的正确方法



我使用的是Spring集成框架,其中输入通道是kafka,输出是jdbc

我想手动提交kafka偏移,只有在jdbcMessageHandler成功处理每个kafka消息之后。

@Bean
@ServiceActivator(inputChannel = "outChannel")
public MessageHandler jdbcMessageHandler() {
JdbcMessageHandler jdbcMessageHandler = new JdbcMessageHandler(getDataSource(), getSql());
jdbcMessageHandler.setPreparedStatementSetter((ps, message) -> {
Item item = ((Item) message.getPayload());
ps.setString(1, item.getName());
Acknowledgment ack = (Acknowledgment) message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT);
ack.acknowledge();
} 
return jdbcMessageHandler;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {    
...
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
...
return new DefaultKafkaConsumerFactory<>(props);
}     
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());                
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
...
return factory;
}

我试过了,从上面可以看出:

Acknowledgment ack = (Acknowledgment) message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT);
ack.acknowledge();

但它会产生不想要的效果:

每个kafka消息可以有n-Items,因此transformer返回一个项目列表,因此将为每个项目调用ack.acknowledge((,n次!

并且我只想在处理完消息的所有项之后调用commit一次。

更新

应用推荐的from答案后。

我打开

ConcurrentKafkaListenerContainerFactory
.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(1000L, 99L)));

我也有

@ServiceActivator(inputChannel = "errorChannel")
public void onError(ErrorMessage message) {

}

发生了什么:在JdbcMessageHandler中发生错误,

onError被触发一次。没有重试,kafka偏移已提交。

我需要防止提交补偿。

更新2

流程:

1(kafkainput->PublishSubscribeChannel,附加到KafkaMessageDrivenChannelAdapter->KafkaListenerContainerFactory->KafkaMessageListenerContainer

也尝试设置listener.setErrorHandler(...

2(

订阅者:

@Transformer(inputChannel = "kafkainput", outputChannel = "aggregator")
@ServiceActivator(inputChannel = "kafkainput")

聚合器->发布订阅频道

订阅者:

`@ServiceActivator(inputChannel = "aggregator")`
public FactoryBean<MessageHandler> aggregatorFactoryBean(..
AggregatorFactoryBean aggregatorFactoryBean =
aggregatorFactoryBean.setOutputChannel(outputChannel);
  1. 输出通道->DirectChannel

订阅者:

@ServiceActivator(inputChannel = "outputChannel")
public MessageHandler jdbcMessageHandler() {

jdbc中出现错误
则只有onError(..)被触发

更新3

做了很多更改,去掉了聚合器,转而使用:

=  new KafkaMessageDrivenChannelAdapter<>(container, KafkaMessageDrivenChannelAdapter.ListenerMode.batch)
kafkaMessageDrivenChannelAdapter.setBatchMessageConverter(new BatchMessagingMessageConverter(converter()));
kafkaMessageDrivenChannelAdapter.setErrorChannelName("error");
kafkaMessageDrivenChannelAdapter.setOutputChannelName("splitter");

设置在KafkaListenerContainerFactory

factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(500,10000)));

在`KafkaMessageListenerContainer'中设置

.setAckMode(ContainerProperties.AckMode.BATCH);

我有拆分器:

@Splitter(inputChannel = "splitter", outputChannel = "outputChannel")

在kafka中,我放置了损坏的消息,因此拆分器中出现错误(我正在抛出MessagingException,然后"onError"只触发一次,并且提交了kafka偏移!

@ServiceActivator(inputChannel = "error")
public void  onError(ErrorMessage message) {
}

为什么它不重试配置的次数,为什么它立即提交偏移?

您需要考虑让自己熟悉Publish-Subscribe模式。例如,一个PublishSubscribeChannel可以有几个订阅者来处理同一消息。因此,除了生成项目列表的转换器之外,您还可以有一个服务激活器,当JDBC通道适配器处理批处理中的所有项目时,它只调用ack.acknowledge()一次。但是,当然,变压器的输入通道必须是PublishSubscribeChannel。您还可以考虑为您的订阅者显式指定order选项,以确保以正确的顺序调用它们。

另一种方式是RecipientListRouter

有关更多信息,请参阅文档:

https://docs.spring.io/spring-integration/reference/html/core.html#channel-实现publishsubscribechannel

https://docs.spring.io/spring-integration/reference/html/message-routing.html#router-实现收件人列表路由器

当然还有CCD_ 15注释JavaDocs。

相关内容

  • 没有找到相关文章

最新更新