Spring Integration挂起,聚合器未从其输入通道轮询



我们使用的是队列通道&JdbcMessageStore支持的聚合器。

在一段时间内,所有功能都很好,随着请求的增加,来自聚合器输入通道的消息不会被聚合器轮询。应用程序重新启动后,将进行聚合。

请帮忙。我们是走线了还是发生了什么?

这是的配置结构

<channel id="startChannel">
	<queue message-store="channelStore" />
</channel>
<bridge input-channel="startChannel"
	output-channel="routerChannel" />
<channel id="routerChannel">
	<queue message-store="channelStore" />
</channel>
<int:router input-channel="routerChannel"
	expression="payload.status">
	<int:mapping value="chunk completed" channel="loadData" />
	<int:mapping value="job completed" channel="aggregateData" />
	<int:mapping value="Failed" channel="errorChannel" />
</int:router>
<task:executor id="workerThreadExecutor" pool-size="8"
	queue-capacity="40" rejection-policy="DISCARD" />
<channel id="loadData">
	<dispatcher task-executor="workerThreadExecutor" />
</channel>
<service-activator id="dataServiceActivator"
	input-channel="loadData" method="loadUserDetails"
	output-channel="aggregateData">
	<beans:bean class="com.sample.DataServiceActivator" />
</service-activator>
<channel id="aggregateData">
	<queue message-store="channelStore" />
</channel>
<aggregator id="aggregator" input-channel="aggregateData"
	output-channel="completionChannel"
	release-strategy="releaseStrategyBean"
	release-strategy-method="canRelease"
	correlation-strategy-expression="headers.userId" ref="aggregatorBean"
	method="aggregateChunks" send-partial-result-on-expiry="true"
	message-store="persistentMessageStore"
	expire-groups-upon-completion="true" group-timeout="7200000"
	expire-groups-upon-timeout="true" lock-registry="lockRegistry" />
<!--
-- releaseStrategyBean - release when data count equals to total data
sent by 'Job Completed' message.
-- aggregatorBean - with aggregated
messages generates a report - batch job - could take some time to
complete.

-->
<channel id="completionChannel">
	<queue message-store="channelStore" />
</channel>

仅凭配置很难说发生了什么,但看起来您的rejection-policy="DISCARD"不太好:当一个新任务被拒绝时,您将丢失数据,大小为40的内部队列已经满了。考虑改用CALLER_RUNS

此外,还不清楚为什么一个人总是只有队列通道介于两者之间。。。例如,为什么routerChannel也是一个队列,当startChannel已经是一个队列时。。。当一些进程可以简单地发生在同一个线程和调用堆栈上时,没有理由通过队列通道处理所有事情。

我还担心队列通道,因为所有这些通道都是可轮询通道,并且您配置了一些轮询器。这一个基于TaskScheduler,默认情况下池只有10个线程:https://docs.spring.io/spring-integration/docs/5.0.7.RELEASE/reference/html/configuration.html#namespace-任务调度器。因此,这可能会使您陷入这样的情况:所有这些调度线程都很忙,并且没有能力从队列通道轮询更多消息。

最新更新