我用Spring Integration DSL建立了一个控制总线:
// https://docs.spring.io/spring-integration/reference/html/control-bus.html
// https://stackoverflow.com/a/45269746/5873923
@Configuration
public class ControlBus {
@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlows.from(controlChannel()).controlBus().get();
}
@Bean
public MessageChannel controlChannel() {
return MessageChannels.direct().get();
}
}
有了这个,我可以启动/停止来自某些集成流的入站,使用:
controlChannel.send(new GenericMessage<>("@myInbound.start()"));
controlChannel.send(new GenericMessage<>("@myInbound.stop()"));
.send
方法根据是否发送消息返回true或false。如何查看bean的状态?
controlChannel.send(new GenericMessage<>("@myInbound.isRunning()"));
也将返回真或假,
new MessagingTemplate().send(controlChannel, new GenericMessage<>("@myInbound.isRunning()"));
将只发送消息而不返回任何内容
new MessagingTemplate().sendAndReceive(controlChannel, new GenericMessage<>("@myInbound.isRunning()"));
发送消息并挂起等待响应。
如何正确配置控制总线的输出并返回它?
工作正常:
@SpringJUnitConfig
public class So74741707Tests {
@Autowired
@Qualifier("controlBusFlow.input")
MessageChannel controlBusFlowInput;
@Test
void receiveReplyFromControlBus() {
MessagingTemplate messagingTemplate = new MessagingTemplate();
assertThat(
messagingTemplate.sendAndReceive(this.controlBusFlowInput,
new GenericMessage<>("@myEndpoint.isRunning()")))
.extracting(Message::getPayload)
.asInstanceOf(InstanceOfAssertFactories.BOOLEAN)
.isFalse();
messagingTemplate.convertAndSend(this.controlBusFlowInput, "@myEndpoint.start()");
assertThat(
messagingTemplate.sendAndReceive(this.controlBusFlowInput,
new GenericMessage<>("@myEndpoint.isRunning()")))
.extracting(Message::getPayload)
.asInstanceOf(InstanceOfAssertFactories.BOOLEAN)
.isTrue();
}
@Configuration
@EnableIntegration
public static class TestConfiguration {
@Bean
public IntegrationFlow controlBusFlow() {
return BaseIntegrationFlowDefinition::controlBus;
}
@ServiceActivator(inputChannel = "inputChannel", autoStartup = "false")
@EndpointId("myEndpoint")
public void myService(Object payload) {
}
}
}
您可能不仅仅是controlChannel
上的控制总线订户。
start()
和stop()
实际上是单向操作。因此,仅仅send()
就足够了。isRunning()
返回boolean
,为了处理它,我们必须执行请求-应答操作或将应答消息发送到该控制总线端点的输出通道。