调度程序无法传递消息(Spring 集成)



我正在开发我的第一个带有Spring Integration的flow应用程序。简而言之,我的目标是从 tar .gz获取文件 json。我完成了并且它运行良好,但最后,网关发送异常而不是结束。

上下文是这样的:

<int:poller default="true" fixed-delay="0" />
<int:gateway id="extractedFolderProcess" default-reply-channel="endFlow"
default-request-channel="inputChannelDatasetPath"
error-channel="error_channel"
service-interface="com.javarticles.spring.integration.utilities.ExtractedFolderProcess">
</int:gateway>
<int:channel id="error_channel"/>
<int:filter id="nullMessage" input-channel="error_channel" ref="errorFlow" />

<int:channel id="inputChannelDatasetPath"/>
<int:channel id="endFlow" />
<int:channel id="filesIn" >
<int:queue />
</int:channel>
<int:channel id="jsonChannel">
<int:queue />
</int:channel>
<int:channel id="extractedJsonChannel">
<int:queue />
</int:channel>
<task:executor id="executor" pool-size="2" />
<int:splitter id="unzipSplitter" input-channel="inputChannelDatasetPath"  ref="unziperDataset" output-channel="filesIn"/>
<int:splitter id="testSplitter" input-channel="filesIn" ref="jsonSplitter" output-channel="jsonChannel">
<int:poller task-executor="executor" fixed-delay="1000" /> 
</int:splitter>
<int:aggregator input-channel="jsonChannel" ref="jsonAggregator" output-channel="extractedJsonChannel" method="add"/>
<int:aggregator input-channel="extractedJsonChannel" 
ref="sumAggregator" output-channel="endFlow"    
method="add"
release-strategy="releaseStrategySumAggregator" 
correlation-strategy="correlationIdStrategySumAggregator" />
<bean id="errorFlow" class="com.javarticles.spring.integration.endPoints.ErrorFlow" />
<bean id="releaseStrategySumAggregator" class="com.javarticles.spring.integration.utilities.ReleaseStrategySumAggregator" />
<bean id="correlationIdStrategySumAggregator" class="com.javarticles.spring.integration.utilities.CorrelationIdStrategySumAggregator" />
<bean id="jsonSplitter" class="com.javarticles.spring.integration.endPoints.JsonSplitter" />
<bean id="unziperDataset" class="com.javarticles.spring.integration.endPoints.UnzipDataset" />
<bean id="jsonAggregator" class="com.javarticles.spring.integration.endPoints.JsonAggregator"/>
<bean id="sumAggregator" class="com.javarticles.spring.integration.endPoints.SumAggregator" />

StacksTrace显示了这一点:

2018-09-10 14:28:36 ERROR LoggingHandler:192 - org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available, failedMessage=GenericMessage [payload=15878, headers={sequenceNumber=5875, correlationId=0, check=0, id=06f00498-e3ea-2dd0-e5e0-5663974e0e45, sequenceSize=5875, timestamp=1536582516348}]
at org.springframework.integration.dispatcher.AbstractDispatcher.wrapExceptionIfNecessary(AbstractDispatcher.java:133)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:120)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:292)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:671)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:418)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:129)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:272)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:58)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:190)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:186)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:353)
at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:344)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:287)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
... 33 more

可能第二个聚合器发送的消息有一个错误的标头,但我不确定。我该如何解决?

您的配置存在多个问题。 您的filesInjsonChannelextractedJsonChannel都是可投票的频道,但是,我看到的唯一轮询消费者是testSplitter。所以我真的看不出任何聚合器是如何获得消息的。

我建议将这些频道更改为直接频道(删除<queue>(作为起点。

最新更新