骆驼建议不要用指定的文件替换端点



我对Camel很陌生,尽管已经阅读了Apache Camel文档,但我仍然停留在我希望是一个我忽略的微不足道的问题上。

我有一个Spring Boot应用程序,它定义了一个Camel路由,该路由从HTTP调用中以csv格式使用实时价格,使用Bindy将CSV转换为POJO(LivePrice),然后将数据保存到商店。

下面是路由定义:

@Component
public class LivePricesCSVRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from("kafka:" + "{{kafka.topic.live.prices.csv}}" + "{{kafka.broker.location}}")
.routeId("live.prices-persistence-route")
.transacted()
.unmarshal()
.bindy(BindyType.Csv, LivePrice.class).id("convertToCsv")
.process(exchange -> {
List<LivePrice> object = (List<LivePrice>) exchange.getIn().getBody();
object.remove(0); // omit the header
logger.info(object);
})
.bean("livePriceServiceImpl", "populateLivePrices").id("populateLivePrices");
}
}

我想为这条路线创建一个集成测试,其中我提供了一个包含两行和一个标题的测试 csv 文件作为输入,而不是期望在主题 kafka.topic.live.prices.csv 上收到消息。

Date,Symbol,Open,
2019-07-09,BTCUSD,12347.18
2019-07-08,BTCUSD,11475.07

我还想在 Exchange 持久化之前拦截它并将其发送到端点 mock:output,在那里我可以执行断言。

这是我写的测试:


@RunWith(CamelSpringBootRunner.class)
@SpringBootTest
@MockEndpoints
@UseAdviceWith
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
public class LivePricesPersistenceRouteTest {
@Autowired
CamelContext camelContext;
@EndpointInject(uri = "mock:output")
private MockEndpoint mockOutput;
@Test
public void testSendLivePricesCsvToTopic() throws Exception {
camelContext.getRouteDefinition("live-prices-persistence-route")
.adviceWith(camelContext, new AdviceWithRouteBuilder() {
@Override
public void configure() throws Exception {
replaceFromWith("file://testCsvFile.csv");                      
intercept()
.to("mock:output");
}
});
camelContext.start();
Exchange exchange = mockOutput.assertExchangeReceived(0);
List<LivePrice> livePrices = (List<LivePrice>)exchange.getIn().getBody();
assertThat(livePrices.get(0).getDate(), is("2019-07-09"));        
// TODO ADD MORE ASSERTIONS
mockOutput.assertIsSatisfied();
}
}

当我运行测试时,驼峰会记录以下内容:

2019-07-13 14:35:16.587  INFO 90356 --- [           main] org.apache.camel.model.RouteDefinition   : Adviced route before/after as XML:
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<route xmlns="http://camel.apache.org/schema/spring" customId="true" id="live-prices-persistence-route">
<from uri="kafka:{{kafka.topic.live.prices.csv}}{{kafka.broker.location}}"/>
<transacted>
<unmarshal customId="true" id="convertToCsv">
<bindy type="Csv"/>
</unmarshal>
<process/>
<bean customId="true" id="populateLivePrices" method="populateLivePrices" ref="livePriceServiceImpl"/>
</transacted>
</route>
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<route xmlns="http://camel.apache.org/schema/spring" customId="true" id="live-prices-persistence-route">
<from uri="file://testCsvFile.csv"/>
<intercept>
<to uri="mock:output"/>
</intercept>
<transacted>
<unmarshal customId="true" id="convertToCsv">
<bindy type="Csv"/>
</unmarshal>
<process/>
<bean customId="true" id="populateLivePrices" method="populateLivePrices" ref="livePriceServiceImpl"/>
</transacted>
</route>

但是,测试失败并显示以下输出:

java.lang.AssertionError: mock://output Not enough messages received. Was: 0
at org.apache.camel.component.mock.MockEndpoint.fail(MockEndpoint.java:1494)
at org.apache.camel.component.mock.MockEndpoint.assertTrue(MockEndpoint.java:1482)
at org.apache.camel.component.mock.MockEndpoint.assertExchangeReceived(MockEndpoint.java:1078)
at com.xxx.liveprices.routes.LivePricesPersistenceRouteTest.testSendLivePricesCsvToTopic(LivePricesPersistenceRouteTest.java:78)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.springframework.test.context.junit4.statements.RunBeforeTestExecutionCallbacks.evaluate(RunBeforeTestExecutionCallbacks.java:74)
at org.springframework.test.context.junit4.statements.RunAfterTestExecutionCallbacks.evaluate(RunAfterTestExecutionCallbacks.java:84)
at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:251)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

任何人都可以指导我为什么不使用该文件以及为什么 Exchange 没有被拦截并发送到模拟端点?

经过更多的阅读,我仍然无法确定为什么以下代码没有读取和交换我的输入数据:

replaceFromWith("file://testCsvFile.csv");

相反,我选择将CSV文件的内容作为字符串提供,并使用weaveById来替换输入数据。

以下是我完成目标的测试:

@RunWith(CamelSpringBootRunner.class)
@SpringBootTest
@MockEndpoints
@UseAdviceWith
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
public class LivePricesPersistenceRouteTest {
@Autowired
CamelContext camelContext;
@Autowired
ProducerTemplate producerTemplate;
@EndpointInject(uri = "mock:output")
private MockEndpoint mockOutput;
@Test
public void testSendLivePricesCsvToTopic() throws Exception {
camelContext.getRouteDefinition("live-prices-persistence-route")
.adviceWith(camelContext, new AdviceWithRouteBuilder() {
@Override
public void configure() throws Exception {
replaceFromWith("direct:test");
weaveById("populateLivePrices").replace().inOut("mock:output");
}
});
camelContext.start();
String message = "Date,Symbol,Open,n" +
"2019-07-09,BTCUSD,12347.18n" +
"2019-07-08,BTCUSD,11475.07";
producerTemplate.sendBody("direct:test", message);
Exchange exchange = mockOutput.assertExchangeReceived(0);
List<LivePrice> livePrices = (List<LivePrice>)exchange.getIn().getBody();
assertThat(livePrices.get(0).getDate(), is("2019-07-09"));
assertThat(livePrices.get(0).getOpen(), is("12347.18"));
assertThat(livePrices.get(1).getDate(), is("2019-07-08"));
assertThat(livePrices.get(1).getOpen(), is("11475.07"));
mockOutput.assertIsSatisfied();
}
}

我知道这是一个老问题,但万一其他人遇到同样的问题......

我看了MockEndpoint.assertExchangeReceived()的实施.这种方法没有任何内置的等待机制,不像MockEndpoint.assertIsSatisfied(),它将等待可配置的时间量来接收预期的交换次数(可通过resultWaitTime配置)。从camelContext.start()mockOutput.assertExchangeReceived(0)所需的微秒显然不足以让Camel处理您的输入文件。

你几乎做对了,但assertIsSatisfied()的立场为时已晚,缺少实际的期望。这是我认为应该有效的:

mockOutput.expectedMessageCount(1);
camelContext.start();
mockOutput.assertIsSatisfied();
// if the statement above passes, the endpoint is guaranteed
// to have received at least one exchange
Exchange exchange = mockOutput.assertExchangeReceived(0);
List<LivePrice> livePrices = (List<LivePrice>)exchange.getIn().getBody();
assertThat(livePrices.get(0).getDate(), is("2019-07-09"));        
// TODO ADD MORE ASSERTIONS

最新更新