弹簧集成:将排队的消息分派给选择性消费者



我有一个弹簧集成流程,它会产生应该保留的消息,等待适当的消费者出现并使用它们。

@Bean
public IntegrationFlow messagesPerCustomerFlow() {
return IntegrationFlows.
from(WebFlux.inboundChannelAdapter("/messages/{customer}")
.requestMapping(r -> r
.methods(HttpMethod.POST)
)
.requestPayloadType(JsonNode.class)
.headerExpression("customer", "#pathVariables.customer")
)
.channel(messagesPerCustomerQueue()) 
.get();
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
return Pollers.fixedRate(100);
}
@Bean
public QueueChannel messagesPerCustomerQueue() {
return MessageChannels.queue()
.get();
}

队列中的消息应通过 http 作为服务器发送的事件传递,如下所示。

PublisherSubscription 只是发布者和 IntegrationFlowTRegistration 的持有者,后者用于在不再需要动态创建的流时销毁动态创建的流(请注意,GET 的传入消息没有内容,Webflux 集成无法正确处理 ATM,因此需要一个小的解决方法来访问推送到customer标头的路径变量(:

@Bean
public IntegrationFlow eventMessagesPerCustomer() {
return IntegrationFlows
.from(WebFlux.inboundGateway("/events/{customer}")
.requestMapping(m -> m.produces(TEXT_EVENT_STREAM_VALUE))
.headerExpression("customer", "#pathVariables.customer")
.payloadExpression("''") // neeeded to make handle((p,h) work
)
.log()
.handle((p, h) -> {
String customer = h.get("customer").toString();
PublisherSubscription<JsonNode> publisherSubscription =
subscribeToMessagesPerCustomer(customer);
return Flux.from(publisherSubscription.getPublisher())
.map(Message::getPayload)
.doFinally(signalType ->
publisherSubscription.unsubscribe());
})
.get();
}

上述对服务器发送事件的请求动态注册一个流,该流按需订阅队列通道,由具有throwExceptionOnRejection(true)的过滤器实现的选择性消费者。遵循消息处理程序链的规范,该规范应确保将消息提供给所有使用者,直到使用者接受它。

public PublisherSubscription<JsonNode> subscribeToMessagesPerCustomer(String customer) {
IntegrationFlowBuilder flow = IntegrationFlows.from(messagesPerCustomerQueue())
.filter("headers.customer=='" + customer + "'",
filterEndpointSpec -> filterEndpointSpec.throwExceptionOnRejection(true));
Publisher<Message<JsonNode>> messagePublisher = flow.toReactivePublisher();
IntegrationFlowRegistration registration = integrationFlowContext.registration(flow.get())
.register();
return new PublisherSubscription<>(messagePublisher, registration);
}

此构造原则上有效,但存在以下问题:

在没有
  • 订阅者的情况下发送到队列的消息会导致MessageDeliveryException: Dispatcher has no subscribers for channel 'application.messagesPerCustomerQueue'
  • 在没有
  • 匹配的订阅者存在的情况下发送到队列的消息会导致AggregateMessageDeliveryException: All attempts to deliver Message to MessageHandlers failed

我想要的是消息保留在队列中,并反复提供给所有订阅者,直到它被消费或过期(适当的选择性消费者(。我该怎么做?

请注意,GET的传入消息没有内容,Webflux集成无法正确处理ATM

我不明白这种担忧。

WebFluxInboundEndpoint使用此算法:

if (isReadable(request)) {
...
else {
return (Mono<T>) Mono.just(exchange.getRequest().getQueryParams());
}

方法真正GETelse分支的地方。而要发送的消息payload是一个MultiValueMap.而且我们最近与您一起修复了POST的问题,该问题已经在版本5.0.5中发布:https://jira.spring.io/browse/INT-4462

调度程序没有订阅者

原则上不能发生在QueueChannel上。那里根本没有任何调度员。它只是队列,发送方提供要存储的消息。您缺少其他与我们分享的内容。但是,让我们用它自己的名称来调用事物:messagesPerCustomerQueue不是应用程序中的QueueChannel

更新

关于:

我想要的是消息保留在队列中,并反复提供给所有订阅者,直到它被消费或过期(适当的选择性消费者(

我们只看到基于嵌入式 ActiveMQ 的PollableJmsChannel,用于支持消息的 TTL。作为此队列的使用者,您应该与setMinSubscribers(1)有一个PublishSubscribeChannel,以便在还没有订阅者时MessagingTemplate抛出MessageDeliveryException。这样,JMS 事务将被回滚,消息将在下一个轮询周期返回到队列。

内存中的问题QueueChannel没有事务性重新传递,并且一旦从该队列轮询消息,消息就会丢失。

另一个选项类似于JMS(事务性(是QueueChannelJdbcChannelMessageStore。虽然这样我们没有 TTL 功能...

相关内容

  • 没有找到相关文章

最新更新