我正在用骆驼处理大文件。这个想法是通过行分开源文件,并将每行作为单个消息发送到AKKA组件,该组件将每行作为独立消息处理。Akka组件最终使用另一个路由输出文件(但这与此问题无关)。
在处理特定行时,Akka组件可能会遇到错误,在这种情况下,我想向文件消费者骆驼组件发出信号,停止阅读文件并将其移至另一个位置。由于文件可能很大(最多500 MB的文本),因此我使用的是流式模式和节流组件,我需要向Akka组件中的错误发出信号,并在文件消费者中触发MoveFailed选项,但是我无法工作。
这是我的路线:
<route id="splitFile" >
<from uri="file:C:/tmp/in_test?preMove=staging&move=.done&moveFailed=.error&readLock=changed&readLockCheckInterval=1500"/>
<split streaming="true" stopOnException="true" shareUnitOfWork="true" >
<tokenize token="rn" group="1"/>
<setHeader headerName="CamelSplitIndex"> <!-- line number basically -->
<simple>property.CamelSplitIndex</simple>
</setHeader>
<setHeader headerName="CamelSplitComplete"> <!-- did I read the last line? -->
<simple>property.CamelSplitComplete</simple>
</setHeader>
<throttle timePeriodMillis="1000">
<constant>850</constant>
<to uri="vm:PROCESSOR_RECEIVER"/>
</throttle>
</split>
</route>
然后在Akka中发生错误时,我正在报告(按照此类指南(按照"交货确认"的这些准则))这样的:
- 有一个Akkacamel Actor,其中消费者绑定到VM:Processor_receiver
- 在其接收方法中,这位演员确实
- 发送者!ACK如果成功
- 发送者!akka.actor.status.failure(ChannelException())如果不成功
我所看到的是,我从O.A.C.P.Defaulterrorhandler获得了详细的日志"(MessageID:... on ExchangeId:...),其中包括"消息历史记录"和有关交换的信息的详细信息。移至舞台。
我想发生的事情再次是,组件停止将内容发送到VM:passecor_receiver,并且文件移至stating.error。
我已经尝试了该路由上的变体,包括在文件uri选项上指定consumer.bridgeerrorhandler = true,在路由声明上添加了一个onexception标签。但是我无法获得所需的行为。
总结一下问题是:如何从路由的目的地中停止流式传输内容并触发文件消费者中的移动选项?(尤其是目的地是Akka骆驼组件时)。
事实证明,问题在于使用VM:组件与Akka通信。由于VM:是异步的,因此不会传播异常"向上"。简单地将组件更改为直接:使其起作用,即,而不是
<to uri="vm:PROCESSOR_RECEIVER"/>
更改为
<to uri="direct:PROCESSOR_RECEIVER"/>