我有一个弹簧集成流程,它会产生应该保留的消息,等待适当的消费者出现并使用它们。
@Bean
public IntegrationFlow messagesPerCustomerFlow() {
return IntegrationFlows.
from(WebFlux.inboundChannelAdapter("/messages/{customer}")
.requestMapping(r -> r
.methods(HttpMethod.POST)
)
.requestPayloadType(JsonNode.class)
.headerExpression("customer", "#pathVariables.customer")
)
.channel(messagesPerCustomerQueue())
.get();
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
return Pollers.fixedRate(100);
}
@Bean
public QueueChannel messagesPerCustomerQueue() {
return MessageChannels.queue()
.get();
}
队列中的消息应通过 http 作为服务器发送的事件传递,如下所示。
PublisherSubscription 只是发布者和 IntegrationFlowTRegistration 的持有者,后者用于在不再需要动态创建的流时销毁动态创建的流(请注意,GET 的传入消息没有内容,Webflux 集成无法正确处理 ATM,因此需要一个小的解决方法来访问推送到customer
标头的路径变量(:
@Bean
public IntegrationFlow eventMessagesPerCustomer() {
return IntegrationFlows
.from(WebFlux.inboundGateway("/events/{customer}")
.requestMapping(m -> m.produces(TEXT_EVENT_STREAM_VALUE))
.headerExpression("customer", "#pathVariables.customer")
.payloadExpression("''") // neeeded to make handle((p,h) work
)
.log()
.handle((p, h) -> {
String customer = h.get("customer").toString();
PublisherSubscription<JsonNode> publisherSubscription =
subscribeToMessagesPerCustomer(customer);
return Flux.from(publisherSubscription.getPublisher())
.map(Message::getPayload)
.doFinally(signalType ->
publisherSubscription.unsubscribe());
})
.get();
}
上述对服务器发送事件的请求动态注册一个流,该流按需订阅队列通道,由具有throwExceptionOnRejection(true)
的过滤器实现的选择性消费者。遵循消息处理程序链的规范,该规范应确保将消息提供给所有使用者,直到使用者接受它。
public PublisherSubscription<JsonNode> subscribeToMessagesPerCustomer(String customer) {
IntegrationFlowBuilder flow = IntegrationFlows.from(messagesPerCustomerQueue())
.filter("headers.customer=='" + customer + "'",
filterEndpointSpec -> filterEndpointSpec.throwExceptionOnRejection(true));
Publisher<Message<JsonNode>> messagePublisher = flow.toReactivePublisher();
IntegrationFlowRegistration registration = integrationFlowContext.registration(flow.get())
.register();
return new PublisherSubscription<>(messagePublisher, registration);
}
此构造原则上有效,但存在以下问题:
在没有- 订阅者的情况下发送到队列的消息会导致
MessageDeliveryException: Dispatcher has no subscribers for channel 'application.messagesPerCustomerQueue'
在没有 - 匹配的订阅者存在的情况下发送到队列的消息会导致
AggregateMessageDeliveryException: All attempts to deliver Message to MessageHandlers failed
。
我想要的是消息保留在队列中,并反复提供给所有订阅者,直到它被消费或过期(适当的选择性消费者(。我该怎么做?
请注意,GET的传入消息没有内容,Webflux集成无法正确处理ATM
我不明白这种担忧。
WebFluxInboundEndpoint
使用此算法:
if (isReadable(request)) {
...
else {
return (Mono<T>) Mono.just(exchange.getRequest().getQueryParams());
}
方法真正GET
去else
分支的地方。而要发送的消息payload
是一个MultiValueMap
.而且我们最近与您一起修复了POST
的问题,该问题已经在版本5.0.5
中发布:https://jira.spring.io/browse/INT-4462
调度程序没有订阅者
原则上不能发生在QueueChannel
上。那里根本没有任何调度员。它只是队列,发送方提供要存储的消息。您缺少其他与我们分享的内容。但是,让我们用它自己的名称来调用事物:messagesPerCustomerQueue
不是应用程序中的QueueChannel
。
更新
关于:
我想要的是消息保留在队列中,并反复提供给所有订阅者,直到它被消费或过期(适当的选择性消费者(
我们只看到基于嵌入式 ActiveMQ 的PollableJmsChannel
,用于支持消息的 TTL。作为此队列的使用者,您应该与setMinSubscribers(1)
有一个PublishSubscribeChannel
,以便在还没有订阅者时MessagingTemplate
抛出MessageDeliveryException
。这样,JMS 事务将被回滚,消息将在下一个轮询周期返回到队列。
内存中的问题QueueChannel
没有事务性重新传递,并且一旦从该队列轮询消息,消息就会丢失。
另一个选项类似于JMS(事务性(是QueueChannel
的JdbcChannelMessageStore
。虽然这样我们没有 TTL 功能...