在Spring Integration,DSL中,如何指定对已发布频道的订阅



当使用Spring Integration DSL构建器模式时,它通常会填充元素之间所需的通道"自动";。然而,有时情况并非如此。

在高层,包装应用程序将元数据保存在数据库中,以便根据需要在我们以前(可能(从未见过的平台上动态创建和销毁流。因此,流不适合使用静态符号(如@Bean(进行实例化,而是必须在运行时动态创建和销毁,并在spring上下文中注册/注销。

我在动态创建的主流中使用了一个已发布的消息通道,在动态创建子流中也有一个通道,但我看不到如何从子流订阅mainPublishChannel。

这让我把消息推送到频道,但如果没有订阅,什么都不会发生。

提前谢谢。


一些先前的研究(不是详尽的列表:

https://github.com/spring-projects/spring-integration-flow

https://dzone.com/articles/spring-integration-building

https://xpadro.com/2014/05/spring-integration-4-0-a-complete-xml-free-example.html

Spring集成网关";Dispatcher没有订阅者";

Spring Integration-如何调试';Dispatcher没有订阅服务器';?


日志剪切

task-scheduler-1 2020-12-31 00:25:32,526 INFO  o.s.i.g.GatewayProxyFactoryBean - started b653ca1c-038d-4567-bd4e-4c16ecc502a3.org.springframework.integration.config.ConsumerEndpointFactoryBean#3#gpfb
task-scheduler-1 2020-12-31 00:25:32,538 DEBUG o.s.i.c.PublishSubscribeChannel - preSend on channel 'b653ca1c-038d-4567-bd4e-4c16ecc502a3.mainPublishChannel', message: GenericMessage [payload=[{... timestamp=1609395932538}]
task-scheduler-1 2020-12-31 00:25:32,539 DEBUG o.s.i.d.BroadcastingDispatcher - No subscribers, default behavior is ignore
task-scheduler-1 2020-12-31 00:25:32,539 DEBUG o.s.i.c.PublishSubscribeChannel - postSend (sent=true) on channel 'b653ca1c-038d-4567-bd4e-4c16ecc502a3.mainPublishChannel', message: GenericMessage [payload=[{aaa=ee}], headers={aaa=ee, sequenceNumber=1, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@7771e96a, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@7771e96a, sequenceSize=2, yyy=2020-12-24 11:15:30.915278, correlationId=0eef0e4e-768c-90db-fa7b-2d1767335b26, timestamp=1609395932538}]

代码段:

String channelId=getId().toString()+'.'+"mainPublishChannel";
MessageChannel channel = MessageChannels.publishSubscribe(channelId, stepTaskExecutor).get();
final IntegrationFlowBuilder bldr = IntegrationFlows
.from(setupAdapter,
c -> c.poller(Pollers.fixedRate(pollerFixedRate, TimeUnit.MILLISECONDS).maxMessagesPerPoll(1)))
.enrichHeaders(h -> h.headerExpression("xxx", "payload[0].get("xxx")")
.headerExpression("yyy", "payload[0].get("yyy")"))
.split(tableSplitter)
.gateway(channel)   
.routeToRecipients(r -> this.buildRecipientListRouterSpecForRules(r, rules, channel)
) 
.aggregate()
.handle(cleanupAdapter)
;
...
snip
...
private RecipientListRouterSpec buildRecipientListRouterSpecForRules(RecipientListRouterSpec recipientListSpec,
Collection<RuleMetadata> rules, MessageChannel publishedChannel) {
// ??? How to subscribe this to publishedChannel??
recipientListSpec
.recipient(MessageChannels.publishSubscribe(this.getId().toString()+'.'+"mainReceiveChannel", stepTaskExecutor).get());

rules.forEach(
rule -> recipientListSpec.recipientFlow(getFilterExpression(rule), f -> createFlowDefForRule(f, rule)));

recipientListSpec
.ignoreSendFailures(true)
.defaultOutputToParentFlow();

return recipientListSpec;
}

publishedChannel必须作为输入通道传递给子流

return flowDef
.channel(receiveChannel) //  <---- This is the reference to the main publish channel in the child flow, which allows the builder to create the subscription
.log()
.handle(inboundAdapter)
... snip ...
;

最新更新