带有轮询器的弹簧集成桥未按预期为 JMS 工作



使用 spring-integration 5.0.7 来限制两个 JMS 队列之间的 msg 桥接。

文档位于: https://docs.spring.io/spring-integration/docs/5.0.7.RELEASE/reference/html/messaging-channels-section.html#bridge-namespace

建议:

<int:bridge input-channel="pollable" output-channel="subscribable">
<int:poller max-messages-per-poll="10" fixed-rate="5000"/>
</int:bridge>

但是模式验证器抱怨桥 elt 上的"没有嵌套轮询器允许可订阅的输入通道"。

但是,如果我将轮询器放在输入通道适配器上,如下所示:

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns:int="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/jms
http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
">
<int:channel id="inChannel" />
<int:channel id="outChannel" />
<int-jms:inbound-channel-adapter id="jmsIn" connection-factory="jmsConnectionFactory" destination-name="_dev.inQueue" channel="inChannel">
<int:poller fixed-delay="5000" max-messages-per-poll="2"/>
</int-jms:inbound-channel-adapter>
<int-jms:outbound-channel-adapter id="jmsOut" connection-factory="jmsConnectionFactory" destination-name="_dev.outQueue" channel="outChannel"/>
<int:bridge input-channel="inChannel" output-channel="outChannel">
</int:bridge>
</beans:beans>

没有任何东西从输入移动到输出。

如何使用速率限制从一个 JMS 队列桥接到另一个 JMS 队列?

更新:

打开日志记录会确认没有从输入通道拾取任何内容,但在其他方面没有帮助:

018-08-10 15:36:33.345 DEBUG 112066 --- [ask-scheduler-1] o.s.i.e.SourcePollingChannelAdapter      : Received no Message during the poll, returning 'false'
2018-08-10 15:36:38.113 DEBUG 112066 --- [ask-scheduler-2] o.s.integration.jms.DynamicJmsTemplate   : Executing callback on JMS Session: ActiveMQSession {id=ID:whitechapel-35247-1533940593148-3:2:1,started=true} java.lang.Object@5c278302
2018-08-10 15:36:38.116 DEBUG 112066 --- [ask-scheduler-2] o.s.i.e.SourcePollingChannelAdapter      : Received no Message during the poll, returning 'false'
2018-08-10 15:36:43.115 DEBUG 112066 --- [ask-scheduler-1] o.s.integration.jms.DynamicJmsTemplate   : Executing callback on JMS Session: ActiveMQSession {id=ID:whitechapel-35247-1533940593148-3:3:1,started=true} java.lang.Object@1c09a81e
2018-08-10 15:36:43.118 DEBUG 112066 --- [ask-scheduler-1] o.s.i.e.SourcePollingChannelAdapter      : Received no Message during the poll, returning 'false'

这是一个Spring Boot应用程序,使用Java DSL配置,它与XML中的配置完全相同(减去桥接(;它工作正常。

@SpringBootApplication
public class So51792909Application {
private static final Logger logger = LoggerFactory.getLogger(So51792909Application.class);
public static void main(String[] args) {
SpringApplication.run(So51792909Application.class, args);
}
@Bean
public ApplicationRunner runner(JmsTemplate template) {
return args -> {
for (int i = 0; i < 10; i++) {
template.convertAndSend("foo", "test");
}
};
}
@Bean
public IntegrationFlow flow(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Jms.inboundAdapter(connectionFactory)
.destination("foo"), e -> e
.poller(Pollers
.fixedDelay(5000)
.maxMessagesPerPoll(2)))
.handle(Jms.outboundAdapter(connectionFactory)
.destination("bar"))
.get();
}
@JmsListener(destination = "bar")
public void listen(String in) {
logger.info(in);
}
}

2018-08-10 19:38:52.534  INFO 13408 --- [enerContainer-1] com.example.So51792909Application        : test
2018-08-10 19:38:52.543  INFO 13408 --- [enerContainer-1] com.example.So51792909Application        : test
2018-08-10 19:38:57.566  INFO 13408 --- [enerContainer-1] com.example.So51792909Application        : test
2018-08-10 19:38:57.582  INFO 13408 --- [enerContainer-1] com.example.So51792909Application        : test
2018-08-10 19:39:02.608  INFO 13408 --- [enerContainer-1] com.example.So51792909Application        : test
2018-08-10 19:39:02.622  INFO 13408 --- [enerContainer-1] com.example.So51792909Application        : test
2018-08-10 19:39:07.640  INFO 13408 --- [enerContainer-1] com.example.So51792909Application        : test
2018-08-10 19:39:07.653  INFO 13408 --- [enerContainer-1] com.example.So51792909Application        : test
2018-08-10 19:39:12.672  INFO 13408 --- [enerContainer-1] com.example.So51792909Application        : test
2018-08-10 19:39:12.687  INFO 13408 --- [enerContainer-1] com.example.So51792909Application        : test

如您所见,使用者每 5 秒收到 2 条消息。

调试日志意味着队列中没有消息。

编辑

我想通了;XML 解析器将 JmsTemplate receiveTimeout 设置为 nowait (-1(。由于您没有使用缓存连接工厂,因此我们永远不会收到消息,因为如果客户端中尚不存在消息,ActiveMQ 客户端会立即返回(请参阅此答案(。由于没有缓存,我们在每次投票中都会得到一个新的消费者(并且每次都进行无等待接收(。

DSL 保留 JmsTemplate 的默认值(无限等待 - 这实际上是错误的,因为如果没有消息,它会无限期地阻塞轮询器线程(。

若要修复 XML 版本,添加receive-timeout="1000"可以修复它。

但是,最好使用CachingConnectionFactory以避免在每次轮询上创建新的连接/会话/使用者。

不幸的是,配置CachingConnectionFactory会关闭Spring Boot的自动配置。此问题在引导 2.1 中已修复。

我在这里打开了一个问题来解决DSL和XML之间的不一致。

如果您坚持使用DSL,我建议您将接收超时设置为合理的值,而不是无限期:

@Bean
public IntegrationFlow flow(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Jms.inboundAdapter(connectionFactory)
.configureJmsTemplate(t -> t.receiveTimeout(1000))
.destination("foo"), e -> e
.poller(Pollers
.fixedDelay(5000)
.maxMessagesPerPoll(2)))
.handle(Jms.outboundAdapter(connectionFactory)
.destination("bar"))
.get();
}

但是,最好的解决方案是使用CachingConnectionFactory.

最新更新