我已经在 spring 集成中配置了 tcp 调用,具有以程:网关(未来调用(====>分配器(带执行器(====>路由器(到2个不同的转换器(===>Outboud-gateway===>聚合器====>服务
聚合器在接收回复时配置,因为需要根据从第一次调用收到的特定值进行另一个 tcp 调用,然后发送到服务类。
在执行一段时间后(记录正确持久化(后,当一切都停止时,执行程序活动计数达到最大池大小并且执行程序队列包含排队的消息,并且一切都永远停止并且需要终止主进程时,我遇到了问题。
<int:gateway id="clientPositionsGateway" service-interface="com.example.ClientPositionsGateway" async-executor="syncExecutor">
<int:method name="fetchClientPositions" request-channel="clientPositionsRequestChannel" reply-channel="clientPositionsResponseChannel"/>
<int:method name="getSecurityData" request-channel="securityDataRequestChannel" reply-channel="clientPositionsResponseChannel"/>
</int:gateway>
<int:channel id="clientPositionsRequestChannel" >
</int:channel>
<int:channel id="securityDataRequestChannel" >
</int:channel>
<int:splitter input-channel="clientPositionsRequestChannel"
output-channel="singleClientPositionsRequestChannel"
/>
<int:channel id="singleClientPositionsRequestChannel" >
<int:dispatcher task-executor="taskExecutor"/>
</int:channel>
<int:recipient-list-router input-channel="singleClientPositionsRequestChannel" >
<int:recipient channel="singleClientCommQueryChannel" />
<int:recipient channel="singleClientTransQueryChannel" />
</int:recipient-list-router>
<int:transformer
input-channel="singleClientTransQueryChannel"
output-channel="transQueryHeaderEnricherRequestChannel"
ref="dmPOSBaseTransQueryTransformer" order="2"/>
<int:header-enricher id="transQueryHeaderEnricher" input-channel="transQueryHeaderEnricherRequestChannel" output-channel="dmQueryRequestChannel" >
<int:header name="transQueryHeader" value="TRANS_QUERY_HEADER"/>
</int:header-enricher>
<int:channel id="transQueryHeaderEnricherRequestChannel" >
</int:channel>
<int:transformer
input-channel="singleClientCommQueryChannel"
output-channel="dmQueryRequestChannel"
ref="dmPOSBaseCommQueryTransformer" order="1"/>
<int:transformer
input-channel="securityDataRequestChannel"
output-channel="secQueryHeaderEnricherRequestChannel"
ref="dmSECBaseQueryTransformer" />
<int:header-enricher id="secQueryHeaderEnricher" input-channel="secQueryHeaderEnricherRequestChannel" output-channel="dmQueryRequestChannel" >
<int:header name="secQueryHeader" value="SEC_QUERY_HEADER"/>
</int:header-enricher>
<int:channel id="singleClientCommQueryChannel" >
</int:channel>
<int:channel id="transformSecurityDataRequestChannel" />
<int:channel id="singleClientTransQueryChannel" >
</int:channel>
<int:channel id="dmQueryRequestChannel" >
</int:channel>
<int:channel id="secQueryHeaderEnricherRequestChannel" >
</int:channel>
<ip:tcp-outbound-gateway id="dmServerGateway"
request-channel="dmQueryRequestChannel"
reply-channel="dmQueryResponseChannel"
connection-factory="csClient"
reply-timeout="3600000" request-timeout="3600000"
/>
<int:aggregator input-channel="dmQueryResponseChannel"
method="aggregateClientPositions"
ref="clientPositionsAggregator"
output-channel="aggregateDataResponseChannel"
correlation-strategy-expression="headers[id]"
release-strategy-expression="size() == 1"
send-partial-result-on-expiry="true" />
<int:service-activator method="createClientPosition" input-channel="aggregateDataResponseChannel" output-channel="clientPositionsResponseChannel" ref="clientPositionsService" >
</int:service-activator>
<int:channel id="dmQueryResponseChannel" >
</int:channel>
<int:channel id="securityDataResponseChannel" />
<int:channel id="aggregateDataResponseChannel" >
</int:channel>
<int:channel id="clientPositionsResponseChannel" >
以下是网关接口方法:未来> 获取客户位置(列出客户列表(;
列表获取安全数据(字符串符号(;
看起来您的TCP服务是瓶颈。
从另一边看,我有点不明白你的消息流:
- 你有
splitter
,所以你的有效载荷可能是一个List
- 您使用
recipient-list-router
,并在拆分后在此处复制每个项目 - 两个接收者最终都在
tcp:outbound-gateway
,但一个接一个,因为两者都在直接渠道内。 -
tcp:outbound-gateway
向aggregator
发送回复。但是最后一个具有奇怪的聚合配置:每个组在其id
仅包含一条消息,并且作为包含一条消息的列表完成 -
aggregator
将回复(超过service-activator
次(精确地发送到网关的reply-channel
那么,其他拆分的项目呢?它们将丢失,因为您的网关在第一条消息之后就已经得到了回复。所有其他工作似乎都是多余的...