我尝试将Spring Cloud Contract Verifier Stub Runner的消息传递模块与Spring AMQP集成。我用MessageListener
创建了SimpleMessageListenerContainer
,它运行良好。但我有另一种方法。我使用AMQP支持Spring Integration with Inbound Channel Adapter来接收来自队列的消息。是否可以将Spring Cloud Contract Verifier Stub Runner的消息传递模块用于AmqpInboundChannelAdapter?
合同
Contract.make {
description 'insert new message'
label 'test.queue.insert'
input {
triggeredBy('insertMessage()')
}
outputMessage {
sentTo 'test.exchange'
headers {
header('contentType': 'application/json')
header('__TypeId__': 'sk.bulalak.messaging.demospringcloudcontractmessaging.Person')
}
body('''{"firstName": "Janko", "lastName": "Hrasko"}''')
}
}
应用
@SpringBootApplication
@EnableIntegration
public class DemoSpringCloudContractMessagingApplication {
private Log logger = LogFactory.getLog(getClass());
public static void main(String[] args) {
SpringApplication.run(DemoSpringCloudContractMessagingApplication.class, args);
}
@Bean
public Queue testQueue() {
return QueueBuilder.durable("test.queue").build();
}
@Bean
public Exchange testExchange() {
return ExchangeBuilder
.topicExchange("test.exchange")
.durable(true)
.build();
}
@Bean
public Binding testBinding() {
return BindingBuilder
.bind(testQueue())
.to(testExchange())
.with("#")
.noargs();
}
@Bean
public MessageConverter messageConverter(ObjectMapper objectMapper) {
return new Jackson2JsonMessageConverter(objectMapper);
}
@Bean
public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, testQueue()).messageConverter(messageConverter))
.handle(p -> logger.info(p.getPayload()))
.get();
}
}
测试
@RunWith(SpringRunner.class)
@SpringBootTest
@AutoConfigureStubRunner(ids = "org.example:test-service")
public class DemoSpringCloudContractMessagingApplicationTests {
@Autowired
private StubTrigger stubTrigger;
@Test
public void contextLoads() {
boolean result = stubTrigger.trigger("test.queue.insert");
assertTrue(result);
}
}
异常
java.lang.ClassCastException: org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener cannot be cast to org.springframework.amqp.core.MessageListener
at org.springframework.cloud.contract.verifier.messaging.amqp.SpringAmqpStubMessages.send(SpringAmqpStubMessages.java:100)
at org.springframework.cloud.contract.verifier.messaging.amqp.SpringAmqpStubMessages.send(SpringAmqpStubMessages.java:89)
at org.springframework.cloud.contract.stubrunner.StubRunnerExecutor.sendMessage(StubRunnerExecutor.java:235)
at org.springframework.cloud.contract.stubrunner.StubRunnerExecutor.triggerForDsls(StubRunnerExecutor.java:192)
at org.springframework.cloud.contract.stubrunner.StubRunnerExecutor.trigger(StubRunnerExecutor.java:178)
at org.springframework.cloud.contract.stubrunner.StubRunner.trigger(StubRunner.java:146)
at org.springframework.cloud.contract.stubrunner.BatchStubRunner.trigger(BatchStubRunner.java:131)
at sk.bulalak.messaging.demospringcloudcontractmessaging.DemoSpringCloudContractMessagingApplicationTests.contextLoads(DemoSpringCloudContractMessagingApplicationTests.java:24)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.springframework.test.context.junit4.statements.RunBeforeTestExecutionCallbacks.evaluate(RunBeforeTestExecutionCallbacks.java:73)
at org.springframework.test.context.junit4.statements.RunAfterTestExecutionCallbacks.evaluate(RunAfterTestExecutionCallbacks.java:83)
at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:251)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
禁用AMQP存根运行程序支持stubrunner.amqp.enabled=false
,只使用Spring INintegration。
自定义路线:
<beans:beans xmlns="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd">
<!-- REQUIRED FOR TESTING -->
<bridge input-channel="output"
output-channel="outputTest"/>
<channel id="outputTest">
<queue/>
</channel>
</beans:beans>
并使用该上下文运行测试
@ContextConfiguration(classes = Config, loader = SpringBootContextLoader)
@ImportResource("classpath*:integration-context.xml")
@AutoConfigureStubRunner
class IntegrationStubRunnerSpec extends Specification {