获取发送到控制总线的消息的输出



我用Spring Integration DSL建立了一个控制总线:

// https://docs.spring.io/spring-integration/reference/html/control-bus.html
// https://stackoverflow.com/a/45269746/5873923
@Configuration
public class ControlBus {
@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlows.from(controlChannel()).controlBus().get();
}
@Bean
public MessageChannel controlChannel() {
return MessageChannels.direct().get();
}
}

有了这个,我可以启动/停止来自某些集成流的入站,使用:

controlChannel.send(new GenericMessage<>("@myInbound.start()"));
controlChannel.send(new GenericMessage<>("@myInbound.stop()"));

.send方法根据是否发送消息返回true或false。如何查看bean的状态?

controlChannel.send(new GenericMessage<>("@myInbound.isRunning()"));

也将返回真或假,

new MessagingTemplate().send(controlChannel, new GenericMessage<>("@myInbound.isRunning()"));

将只发送消息而不返回任何内容

new MessagingTemplate().sendAndReceive(controlChannel, new GenericMessage<>("@myInbound.isRunning()"));

发送消息并挂起等待响应。

如何正确配置控制总线的输出并返回它?

工作正常:

@SpringJUnitConfig
public class So74741707Tests {
@Autowired
@Qualifier("controlBusFlow.input")
MessageChannel controlBusFlowInput;
@Test
void receiveReplyFromControlBus() {
MessagingTemplate messagingTemplate = new MessagingTemplate();
assertThat(
messagingTemplate.sendAndReceive(this.controlBusFlowInput,
new GenericMessage<>("@myEndpoint.isRunning()")))
.extracting(Message::getPayload)
.asInstanceOf(InstanceOfAssertFactories.BOOLEAN)
.isFalse();
messagingTemplate.convertAndSend(this.controlBusFlowInput, "@myEndpoint.start()");
assertThat(
messagingTemplate.sendAndReceive(this.controlBusFlowInput,
new GenericMessage<>("@myEndpoint.isRunning()")))
.extracting(Message::getPayload)
.asInstanceOf(InstanceOfAssertFactories.BOOLEAN)
.isTrue();
}
@Configuration
@EnableIntegration
public static class TestConfiguration {
@Bean
public IntegrationFlow controlBusFlow() {
return BaseIntegrationFlowDefinition::controlBus;
}
@ServiceActivator(inputChannel = "inputChannel", autoStartup = "false")
@EndpointId("myEndpoint")
public void myService(Object payload) {
}
}
}

您可能不仅仅是controlChannel上的控制总线订户。

start()stop()实际上是单向操作。因此,仅仅send()就足够了。isRunning()返回boolean,为了处理它,我们必须执行请求-应答操作或将应答消息发送到该控制总线端点的输出通道。

最新更新