入站适配器和控制总线延迟



我的集成流程代码是:

@Bean
public IntegrationFlow messageFlow() {
return IntegrationFlows.from(stompInboundChannelAdapter())
.transform(inBoundStompMsgTransformer::transform)
.headerFilter("stomp_subscription","content-length")
.handle(Amqp.outboundAdapter(outboundConfiguration.rabbitTemplate()))
.get();
}

我正在使用Spring Boot。

日志清除指出{transformer}订阅者已添加到输入通道

2019-12-09 18:21:41.752  INFO 18248 --- [           main] o.s.i.s.i.StompInboundChannelAdapter     : started bean 'stompInboundChannelAdapter'; defined in: 'class path resource [com/mahesh/siuspdemo/adapter/InBoundStompAdapter.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@21e360a'
2019-12-09 18:21:41.768  INFO 18248 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {transformer} as a subscriber to the 'stompInputChannel' channel
2019-12-09 18:21:41.768  INFO 18248 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'application.stompInputChannel' has 1 subscriber(s).
2019-12-09 18:21:41.768  INFO 18248 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean 'inboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/mahesh/siuspdemo/adapter/InBoundStompAdapter.class]'; from source: 'bean method inboundFlow'
2019-12-09 18:21:41.768  INFO 18248 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {header-filter} as a subscriber to the 'inboundFlow.channel#0' channel
2019-12-09 18:21:41.772  INFO 18248 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'application.inboundFlow.channel#0' has 1 subscriber(s).
2019-12-09 18:21:41.772  INFO 18248 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean 'inboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#1'; defined in: 'class path resource [com/mahesh/siuspdemo/adapter/InBoundStompAdapter.class]'; from source: 'bean method inboundFlow'
2019-12-09 18:21:41.772  INFO 18248 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {amqp:outbound-channel-adapter} as a subscriber to the 'inboundFlow.channel#1' channel
2019-12-09 18:21:41.772  INFO 18248 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'application.inboundFlow.channel#1' has 1 subscriber(s).
2019-12-09 18:21:41.772  INFO 18248 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean 'inboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#2'; defined in: 'class path resource [com/mahesh/siuspdemo/adapter/InBoundStompAdapter.class]'; from source: 'bean method inboundFlow'

但是,我收到异常,丢失了队列中的前一条/两条消息。它处理剩余的消息。

假设在启动应用程序之前队列中有 10 条消息。启动应用程序后,即使日志显示已添加订阅者并且已启动 bean,我也会收到异常,发布异常,处理 8/9 消息。

例外情况是:org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.stompInputChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage

很明显,上下文尚未完全准备好处理消息,因此也未出现异常。但日志消息具有误导性。

我的第一个问题:

  1. 那么,添加订阅者并启动 bean 时到底意味着什么?这是否意味着一切都已设置,但上下文仍然必须准备好来处理消息?

为了克服这个问题,正如许多帖子中所建议的那样,我使用控制总线来启动适配器。其代码是:

......
@Component
public class ApplicationLifeCycle implements SmartLifecycle {
@Autowired
private MessageChannel controlBusChannel;
@Override
public void start() {
System.out.println("Service starting...");
controlBusChannel.send(new GenericMessage<>("@stompInboundChannelAdapter.start()"));
}
.....

我创造了public class ApplicationLifeCycle implements SmartLifecycle认为它会很方便。

我的第二个问题是:

  1. 这是使用控制总线处理适配器启动/停止的正确/最佳方法吗?如果这不是正确的方法,那么请让我知道正确的方法。

谢谢

马赫什

我认为这是您另一个问题的延续:集成流Amqp通道适配器在handle((中不起作用

你有这个:

@Bean
public StompInboundChannelAdapter stompInboundChannelAdapter() {
StompInboundChannelAdapter adapter = new StompInboundChannelAdapter(stompSessionManager(), "/queue/myQueue");
adapter.setOutputChannel(stompInputChannel());
adapter.setPayloadType(ByteString.class);
return adapter;
}

你在这里没有显示。

问题是您在IntegrationFlow中使用相同的定义。事实证明,StompInboundChannelAdapterbean 启动得更早,然后处理IntegationFlow,订阅.transform(inBoundStompMsgTransformer::transform)来处理传入的消息。

因此,如果您从stompInboundChannelAdapter()中删除该@Bean,它应该可以正常工作。我稍后会看看为什么MessageProducerSupport更早开始,然后IntegrationFlow...

最新更新