当使用拆分器和路由单将主体的各个部分路由到不同的端点时,我发现需要.end()
来避免包含拆分块之外的任何内容。
所需的行为是拆分实体,使用管线唇口将每个零件布线到不同的端点。分割块完成后,继续处理交换(和主体(,与分割前一样。
测试代码有两个相同的路由,除了.routingSlip()
后面的.end()
。当测试运行时,您可以看到带有.end()
的有3条内部处理器消息和一条外部处理程序消息。在分割块完成后,它也将具有正确的有效载荷类型。而使用NOT在routingSlip()
之后具有.end()
的第二个路由的另一个测试将产生3个交错的Inner和Outerprocessor消息。
虽然我可能在文档中遗漏了一些内容,但我找不到任何以这种方式使用拆分器和routingSlip的例子来警告我,我需要.end()
让它按我想要的方式运行。如果这不是一个bug,我建议对这个问题进行更明显的文档记录。我可能早就发现了,但我最初的代码涉及到一个自定义拆分器,这并不是我的代码的问题。
我也不知道这个问题是否也适用于receiventList或dynamicRouter。
package org.apache.camel.processor.routingslip;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.apache.commons.io.FileUtils;
import org.junit.BeforeClass;
import org.junit.Test;
public class SpliterRoutingSlipTest extends CamelTestSupport {
private static final String TEST_DIR = "target/test";
private static final String TEST_OUT_ENDPOINT_WEND = "file:"+TEST_DIR+"/Wend";
private static final String TEST_OUT_ENDPOINT_WOEND = "file:"+TEST_DIR+"/WOend";
private static final String TEST_ROUTE_ID_WEND = "splitBodyTestWEnd";
private static final String TEST_ROUTE_ID_WOEND = "splitBodyTestWOEnd";
private static final String TEST_IN_ENDPOINT_WEND = "direct:"+TEST_ROUTE_ID_WEND;
private static final String TEST_IN_ENDPOINT_WOEND = "direct:"+TEST_ROUTE_ID_WOEND;
private static final String TEST_ROUTING_SLIP_HEADER = "toEndpoint";
private static final List<String> TEST_BODY = Arrays.asList(new String[] {
"This is line 1",
"This is line 2",
"This is line 3",
});
@BeforeClass
public static void init() throws IOException {
File dirToRemove = new File(TEST_DIR);
if (dirToRemove.exists())
FileUtils.forceDelete(dirToRemove);
}
/**
* Test split and routing slip WITH an '.end()' after the routing slip.
*
* The result is that the Inner processor gets called for EACH iteration within the split
* but the Outer process only gets called after the split is complete AND the exchange
* is the one from before being split.
*
* This IS the desired behavior.
*
* @throws Exception
*/
@Test
public void testSplitByBodyAndRouteWithOuterPostProcessing() throws Exception {
MockEndpoint end = getMockEndpoint("mock:end");
end.expectedMessageCount(1);
template.sendBodyAndHeader(TEST_IN_ENDPOINT_WEND, TEST_BODY, TEST_ROUTING_SLIP_HEADER, TEST_OUT_ENDPOINT_WEND);
assertMockEndpointsSatisfied();
}
/**
* Test split and routing slip WITH OUT an '.end()' after the routing slip.
*
* The result is that the inner and outer processors BOTH get called for EACH iteration within the split.
*
* This is NOT the desired effect.
*
* @throws Exception
*/
@Test
public void testSplitByBodyAndRouteWithIncorrectOuterPostProcessing() throws Exception {
MockEndpoint end = getMockEndpoint("mock:end");
end.expectedMessageCount(3);
template.sendBodyAndHeader(TEST_IN_ENDPOINT_WOEND, TEST_BODY, TEST_ROUTING_SLIP_HEADER, TEST_OUT_ENDPOINT_WOEND);
assertMockEndpointsSatisfied();
}
@Override
protected RoutesBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from(TEST_IN_ENDPOINT_WEND).id(TEST_ROUTE_ID_WEND)
.split(body())
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println("This is the INNER processor w/ end().");
Message in = exchange.getIn();
System.out.println("tin="+in);
Object body = in.getBody();
System.out.println("tbody="+body);
System.out.println("tbody.class="+body.getClass());
}
})
.setHeader(TEST_ROUTING_SLIP_HEADER, simple(TEST_OUT_ENDPOINT_WEND))
.setHeader("tempFileName", simple("${file:name}.tmp"))
.log(LoggingLevel.INFO, "Destination endpoint for filename ${file:name} is ${header.toEndpoint}")
.routingSlip(header(TEST_ROUTING_SLIP_HEADER))
.end()
.log(LoggingLevel.INFO, "Sent body to ${header.toEndpoint}/${file:name}")
.end()
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println("This is the OUTER processor w/ end().");
Message in = exchange.getIn();
System.out.println("in="+in);
Object body = in.getBody();
System.out.println("body="+body);
System.out.println("body.class="+body.getClass());
}
})
.to("mock:end")
.end()
;
from(TEST_IN_ENDPOINT_WOEND).id(TEST_ROUTE_ID_WOEND)
.split(body())
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println("This is the INNER processor W/O end().");
Message in = exchange.getIn();
System.out.println("tin="+in);
Object body = in.getBody();
System.out.println("tbody="+body);
System.out.println("tbody.class="+body.getClass());
}
})
.setHeader(TEST_ROUTING_SLIP_HEADER, simple(TEST_OUT_ENDPOINT_WOEND))
.setHeader("tempFileName", simple("${file:name}.tmp"))
.log(LoggingLevel.INFO, "Destination endpoint for filename ${file:name} is ${header.toEndpoint}")
.routingSlip(header(TEST_ROUTING_SLIP_HEADER))
// .end()
.log(LoggingLevel.INFO, "Sent body to ${header.toEndpoint}/${file:name}")
.end()
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println("This is the OUTER processor W/O end().");
Message in = exchange.getIn();
System.out.println("in="+in);
Object body = in.getBody();
System.out.println("body="+body);
System.out.println("body.class="+body.getClass());
}
})
.to("mock:end")
.end()
;
}
};
}
}
.end()
和。endChoice()
仍然是最令人困惑的东西之一:-(
我的建议:
1( 考虑一下您的路由将如何在SpringDSL中表达。在这个基于xml的DSL中,总是必须对块(带有结束标记(进行定界
<from uri="direct:a"/>
<routingSlip ignoreInvalidEndpoints="true"/> <!-- START OF BLOCK -->
<header>myHeader</header>
</routingSlip> <!-- END OF BLOCK -->
</route>
在Java中也这样做!
2( 事实(以及令人困惑的部分(是,对于琐碎的处理(=你在教程/非现实生活中的Camel示例中经常看到的(,Java DSL允许省略以结束块:
from("direct:a")
.routingSlip(header("myHeader"))
.ignoreInvalidEndpoints();
但正确的方法是:
from("direct:a")
.routingSlip(header("myHeader"))
.ignoreInvalidEndpoints()
.end();
3( 我在接收列表中遇到了与您完全相同的问题,该列表也需要关闭!
.split(simple("${body}"))
.streaming()
.aggregate(simple("${body.blockId}"), new PutInBlockStrategy())
.ignoreInvalidCorrelationKeys()
.completionTimeout(5*1000)
.log(TRACE, LOGNAME, "Next block:n${body}")
.recipientList( method(this, "getRecipents") ).end()
.parallelProcessing()
.end()
.end()
.log(INFO, LOGNAME, "File: ${headers.CamelFileName} successfully processed");
4( 如果有疑问,请查看EIP模式的定义的源代码或javadoc,看看它是否有显式end((方法:
https://camel.apache.org/maven/camel-2.15.0/camel-core/apidocs/org/apache/camel/model/RoutingSlipDefinition.html#end((https://camel.apache.org/maven/camel-2.15.0/camel-core/apidocs/org/apache/camel/model/RecipientListDefinition.html#end((
如果是这样,请始终结束您的块!
5( 有趣的帖子:https://www.3riverdev.com/apache-camel-tips-caveats-from-the-trenches/