多线程执行器通道,以加快使用者进程



>我有一个消息生产者,每秒产生大约 15 条消息

消费者是一个 Spring 集成项目,它从消息队列中使用并执行大量处理。目前它是单线程的,无法与生产者发送消息的速率相匹配。因此,队列深度不断增加

return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(Jms.container(this.emsConnectionFactory, this.emsQueue).get()))
.wireTap(FLTAWARE_WIRE_TAP_CHNL)// push raw fa data
.filter(ingFilter, "filterMessageOnEvent").transform(eventHandler, "parseEvent")
.aggregate(a -> a.correlationStrategy(corStrgy, "getCorrelationKey").releaseStrategy(g -> {
boolean eonExists = g.getMessages().stream()
.anyMatch(eon -> ((FlightModel) eon.getPayload()).getEstGmtOnDtm() != null);
if (eonExists) {
boolean einExists = g.getMessages().stream()
.anyMatch(ein -> ((FlightModel) ein.getPayload()).getEstGmtInDtm() != null);
if (einExists) {
return true;
}
}
return false;
}).messageStore(this.messageStore)).channel("AggregatorEventChannel").get();

是否可以使用执行器通道在多线程环境中处理此问题并加快使用者进程

如果是,请建议我如何实现 - 为了确保消息的顺序,我需要将相同类型的消息(基于消息的 id(分配给执行器通道的同一线程。

[更新代码]我创建了以下执行器通道

public static final MessageChannel SKW_DEFAULT_CHANNEL = MessageChannels
.executor(ASQ_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
public static final MessageChannel RPA_DEFAULT_CHANNEL = MessageChannels
.executor(ASH_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();

现在,我从主消息流重定向到自定义路由器,该路由器将消息转发到执行器通道,如下所示 -

@Bean
public IntegrationFlow baseEventFlow1() {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(Jms.container(this.emsConnectionFactory, this.emsQueue).get()))
.wireTap(FLTAWARE_WIRE_TAP_CHNL)// push raw fa data
.filter(ingFilter, "filterMessageOnEvent").route(route()).get();
}
public AbstractMessageRouter router() {
return new AbstractMessageRouter() {
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
if (message.getPayload().toString().contains(""id":"RPA")) {
return Collections.singletonList(RPA_DEFAULT_CHANNEL);
} else if (message.getPayload().toString().contains(""id":"SKW")) {
return Collections.singletonList(SKW_DEFAULT_CHANNEL);
} else {
return Collections.singletonList(new NullChannel());
}
}
};
}

我将为相应的执行器通道提供单独的消费者流。

请纠正我的低调

[更新]

@Bean
@BridgeTo("uaxDefaultChannel")
public MessageChannel ucaDefaultChannel() {
return MessageChannels.executor(UCA_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
}
@Bean
@BridgeTo("uaDefaultChannel")
public MessageChannel ualDefaultChannel() {
return MessageChannels.executor(UAL_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
}
@Bean
public IntegrationFlow uaEventFlow() {
return IntegrationFlows.from("uaDefaultChannel").wireTap(UA_WIRE_TAP_CHNL)
.transform(eventHandler, "parseEvent")
}

所以执行器通道上的桥到会转发消息

因此队列深度不断增加

由于看起来您的队列在 JMS 代理上的某个地方,因此具有这样的行为确实是可以的。这正是消息传递系统的设计 - 区分生产者和消费者,并尽可能处理目的地中的消息。

如果要增加来自 JMS 的轮询,可以考虑在 JMS 容器上有一个concurrency选项:

/**
* The concurrency to use.
* @param concurrency the concurrency.
* @return current {@link JmsDefaultListenerContainerSpec}.
* @see DefaultMessageListenerContainer#setConcurrency(String)
*/
public JmsDefaultListenerContainerSpec concurrency(String concurrency) {
this.target.setConcurrency(concurrency);
return this;
}
/**
* The concurrent consumers number to use.
* @param concurrentConsumers the concurrent consumers count.
* @return current {@link JmsDefaultListenerContainerSpec}.
* @see DefaultMessageListenerContainer#setConcurrentConsumers(int)
*/
public JmsDefaultListenerContainerSpec concurrentConsumers(int concurrentConsumers) {
this.target.setConcurrentConsumers(concurrentConsumers);
return this;
}
/**
* The max for concurrent consumers number to use.
* @param maxConcurrentConsumers the max concurrent consumers count.
* @return current {@link JmsDefaultListenerContainerSpec}.
* @see DefaultMessageListenerContainer#setMaxConcurrentConsumers(int)
*/
public JmsDefaultListenerContainerSpec maxConcurrentConsumers(int maxConcurrentConsumers) {
this.target.setMaxConcurrentConsumers(maxConcurrentConsumers);
return this;
}

有关详细信息,请参阅文档:https://docs.spring.io/spring/docs/5.2.3.RELEASE/spring-framework-reference/integration.html#jms-receiving

但这不允许您"将消息分配给特定线程"。就像没有办法在JMS中进行分区一样。

我们可以通过 Spring 集成来做到这一点,根据您的"基于消息的 id"使用router和配置了单线程Executor的特定ExecutorChannel实例。每个ExecutorChannel都将是其专用的执行器,只有一个线程。这样,您将确保具有相同分区键的消息的顺序,并且您将并行处理它们。所有ExecutorChannel都可以具有相同的订阅者或bridge到同一通道进行处理。

但是,您需要记住,当您离开 JMS 侦听器线程时,您完成了 JMS 事务,并且您无法在该单独的线程中处理消息,您可能会丢失一条消息。

最新更新