Camel事务处理的JMS组件不适用于聚合器



我发现选项为transactiond=true的JMS组件不能很好地与Aggregator配合使用。我的路由在到达聚合器之前是事务性的,在聚合器之后,我的路由就不再是事务性了。如果我理解得很好,这是因为Aggregator在每次交换转换时都会启动新的线程,而这种行为会停止第一个事务线程,从而导致事务提交和ack发送到队列。因此消息不再在队列中。这是正确的吗?

我需要使用Camel读取队列(RabbitMQ(中的所有消息,然后将它们全部转换为一个List,最后将List中的数据写入文件。所有这些都必须是事务性的,如果路由中发生一些错误,消息应该保留在队列中。到目前为止,我所做的基本上是使用jms组件(选项transactiond=true(从队列中读取消息,然后使用聚合器聚合所有消息,然后发送到文件。

我的代码的缩写示例:

from("jms:queue:someQUeue?transacted=true")
// some more processing in the route                          <- route is transactional here
.aggregate((constant(true)), new MyAggregationStrategy())
.completionInterval(500)
.process(new Processor() {                                    <- route is not transactional anymore
// some processing
})
.toD("file://C/tmp...")

我认为这方面的完美组件是简单JMS批处理,但我们使用的是Camel 3.8并且此组件在本版本和更新版本中已不存在。为什么?有什么新东西代替这个吗?

此外,SJMSSJMS2对我没有帮助,当我使用它们时,消费者indeed读取所有消息(队列中的所有消息都未确认-等待确认(,但通过路由的处理仍然是一条接一条的消息。我如何使这成为一个交换所有消息的列表?

还有其他解决方案吗?(例如,制作一些自定义聚合器-但如何?(

据我所知,这是正确的!如果您的意图是实现有保证的交付,那么在这种情况下,您最好使用持久聚合存储库。

最新更新