我使用的是Spring集成框架,其中输入通道是kafka,输出是jdbc。
我想手动提交kafka偏移,只有在jdbcMessageHandler
成功处理每个kafka消息之后。
@Bean
@ServiceActivator(inputChannel = "outChannel")
public MessageHandler jdbcMessageHandler() {
JdbcMessageHandler jdbcMessageHandler = new JdbcMessageHandler(getDataSource(), getSql());
jdbcMessageHandler.setPreparedStatementSetter((ps, message) -> {
Item item = ((Item) message.getPayload());
ps.setString(1, item.getName());
Acknowledgment ack = (Acknowledgment) message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT);
ack.acknowledge();
}
return jdbcMessageHandler;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
...
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
...
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
...
return factory;
}
我试过了,从上面可以看出:
Acknowledgment ack = (Acknowledgment) message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT);
ack.acknowledge();
但它会产生不想要的效果:
每个kafka消息可以有n-Items,因此transformer返回一个项目列表,因此将为每个项目调用ack.acknowledge((,n次!
并且我只想在处理完消息的所有项之后调用commit一次。
更新
应用推荐的from答案后。
我打开
ConcurrentKafkaListenerContainerFactory
.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(1000L, 99L)));
我也有
@ServiceActivator(inputChannel = "errorChannel")
public void onError(ErrorMessage message) {
}
发生了什么:在JdbcMessageHandler中发生错误,
onError被触发一次。没有重试,kafka偏移已提交。
我需要防止提交补偿。
更新2
流程:
1(kafkainput->PublishSubscribeChannel,附加到KafkaMessageDrivenChannelAdapter
->KafkaListenerContainerFactory
->KafkaMessageListenerContainer
也尝试设置listener.setErrorHandler(...
2(
订阅者:
@Transformer(inputChannel = "kafkainput", outputChannel = "aggregator")
@ServiceActivator(inputChannel = "kafkainput")
聚合器->发布订阅频道
订阅者:
`@ServiceActivator(inputChannel = "aggregator")`
public FactoryBean<MessageHandler> aggregatorFactoryBean(..
AggregatorFactoryBean aggregatorFactoryBean =
aggregatorFactoryBean.setOutputChannel(outputChannel);
- 输出通道->DirectChannel
订阅者:
@ServiceActivator(inputChannel = "outputChannel")
public MessageHandler jdbcMessageHandler() {
jdbc中出现错误
则只有onError(..)
被触发
更新3
做了很多更改,去掉了聚合器,转而使用:
= new KafkaMessageDrivenChannelAdapter<>(container, KafkaMessageDrivenChannelAdapter.ListenerMode.batch)
kafkaMessageDrivenChannelAdapter.setBatchMessageConverter(new BatchMessagingMessageConverter(converter()));
kafkaMessageDrivenChannelAdapter.setErrorChannelName("error");
kafkaMessageDrivenChannelAdapter.setOutputChannelName("splitter");
设置在KafkaListenerContainerFactory
中
factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(500,10000)));
在`KafkaMessageListenerContainer'中设置
.setAckMode(ContainerProperties.AckMode.BATCH);
我有拆分器:
@Splitter(inputChannel = "splitter", outputChannel = "outputChannel")
在kafka中,我放置了损坏的消息,因此拆分器中出现错误(我正在抛出MessagingException
,然后"onError"只触发一次,并且提交了kafka偏移!
@ServiceActivator(inputChannel = "error")
public void onError(ErrorMessage message) {
}
为什么它不重试配置的次数,为什么它立即提交偏移?
您需要考虑让自己熟悉Publish-Subscribe
模式。例如,一个PublishSubscribeChannel
可以有几个订阅者来处理同一消息。因此,除了生成项目列表的转换器之外,您还可以有一个服务激活器,当JDBC通道适配器处理批处理中的所有项目时,它只调用ack.acknowledge()
一次。但是,当然,变压器的输入通道必须是PublishSubscribeChannel
。您还可以考虑为您的订阅者显式指定order
选项,以确保以正确的顺序调用它们。
另一种方式是RecipientListRouter
。
有关更多信息,请参阅文档:
https://docs.spring.io/spring-integration/reference/html/core.html#channel-实现publishsubscribechannel
https://docs.spring.io/spring-integration/reference/html/message-routing.html#router-实现收件人列表路由器
当然还有CCD_ 15注释JavaDocs。