我使用Apache Camel已经有一段时间了,我遇到了拆分器功能。我想知道为什么我不能在某些条件下停止整个分裂过程?在谷歌上搜索了几个小时后,我没有找到任何关于它的信息。我只找到了stopOnException()
和stopOnAggregateException()
。但如果我想在一些特定的异常或甚至是一些条件下停止它,这取决于消息数据。有人能告诉我怎么做吗?
让我们考虑一下代码的平和性
from("timer://file-poll?period=5s")
.onException(Exception.class)
.log(LoggingLevel.ERROR, "Error message")
.end()
.to("direct:load-file") // load csv
.split(body()).delimiter("n") // split file into lines
.to("direct:process-line") // throws exceptions
.end()
.to("direct:save-result");
我轮询文件系统并加载一个1,000行的csv文件。
然后我将文件分成几行,并分别处理每一行。
direct:process-line
路由可以抛出异常,如果它抛出LineProcessingException
,我需要跳过这行并继续处理下一行,但如果它抛出任何其他异常,我需要停止拆分器。
例如:处理第15行抛出LineProcessingException
,我跳过这一行,继续处理其余的985行。
处理第30行抛出IOException
,所以现在我需要停止处理剩下的970行并转到direct:save-result
路由。
我尝试了stopOnException()
和stopOnAggregateException()
,但正如我上面所说的,它会在任何异常情况下停止拆分器,我不需要停止拆分器,如果LineProcessingException
被抛出,就跳过这行。我还尝试在处理线路的子路由上调用stop()
,但它只停止子路由而不是拆分器。
我仍然不相信过滤是一个糟糕的选择。如果您可以预先识别并忽略无效数据(正如您的问题所建议的),那么过滤是非常优雅的。
下面有两个示例,说明如何跳过已知无效且因此不可处理的数据。这样就不需要一个异常。希望它能给你解决实际问题的新思路。
from("timer:mainRouteTrigger?repeatCount=1")
.routeId("MainRouteTrigger")
// invalid value (BOOM)
.setBody(constant("100n200n300nBOOMn400"))
.to("direct:mainRoute")
.end()
;
from("direct:mainRoute")
.routeId("MainRoute")
.log("MainRoute BEGINS: BODY: ${body}")
.split(body().tokenize("n"))
.multicast()
.to("direct:processRecord1")
.to("direct:processRecord2")
.end()
.end()
.log("MainRoute ENDS: BODY: ${body}")
.end()
;
from("direct:processRecord1")
.routeId("processRecord1")
.filter(method(ValueFilter.class, "isValidValue"))
.log("processing record ${body}")
.end()
;
from("direct:processRecord2")
.routeId("processRecord2")
.process(e -> {
String body = e.getMessage().getBody(String.class);
if (ValueFilter.isValidValue(body)) {
e.setProperty("PROCESSED", true);
e.getMessage().setBody("processing record " + body);
}
})
.choice()
.when(simple("${exchangeProperty.PROCESSED}"))
.log("${body}")
.end()
;
import org.apache.camel.Body;
public class ValueFilter {
static public boolean isValidValue(@Body String body) {
switch (body) {
case "100":
case "200":
case "300":
case "400":
return true;
default:
return false;
}
}
}
运行示例时,记录的正确输出如下:
MainRoute INFO MainRoute BEGINS: BODY: 100
200
300
BOOM
400
processRecord1 INFO processing record 100
processRecord2 INFO processing record 100
processRecord1 INFO processing record 200
processRecord2 INFO processing record 200
processRecord1 INFO processing record 300
processRecord2 INFO processing record 300
processRecord1 INFO processing record 400
processRecord2 INFO processing record 400
MainRoute INFO MainRoute ENDS: BODY: 100
200
300
BOOM
400