使用 Spring-Integration-Kinesis 消息驱动适配器时出现内部故障



我已经设置了这个 KinesisMessageDrivenChannelAdapter:

@Bean
public KinesisMessageDrivenChannelAdapter kinesisInboundChannel(AmazonKinesis amazonKinesis, MetadataStore store) {
KinesisMessageDrivenChannelAdapter adapter =
new KinesisMessageDrivenChannelAdapter(amazonKinesis, config.getStreamName());
adapter.setCheckpointMode(CheckpointMode.batch);
adapter.setListenerMode(ListenerMode.batch);
adapter.setStartTimeout(10000);
// Set idle to milliseconds.  Max value is 596 before getting an overflow exception.
adapter.setIdleBetweenPolls(config.getPollHours() * 3_600_000);
adapter.setShouldTrack(true);
adapter.setDescribeStreamRetries(5);
adapter.setConcurrency(50);
adapter.setCheckpointStore(store);
adapter.setStreamInitialSequence(KinesisShardOffset.trimHorizon());
adapter.setOutputChannelName("logMessage.input");
adapter.setErrorChannel(errorChannel());
return adapter;
}

大多数时候,它工作正常。 但时不时地,我会收到这样的消息:

Exception in thread "kinesisInboundChannel-kinesis-consumer-1" com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request ID: c2f66be9-23f4-b211-9165-ed92383ee673)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2276)
at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2252)
at com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetRecords(AmazonKinesisClient.java:1062)
at com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:1038)
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.getRecords(KinesisMessageDrivenChannelAdapter.java:853)
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.access$3500(KinesisMessageDrivenChannelAdapter.java:688)
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer$2.run(KinesisMessageDrivenChannelAdapter.java:816)
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ConsumerInvoker.run(KinesisMessageDrivenChannelAdapter.java:1003)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

之后,适配器将完全停止工作,而不会挂起应用程序。 我指定了要使用的错误通道,如果需要,我很乐意重新启动应用程序以使适配器重新联机。 但这似乎不是一种选择。

如何将错误处理构建到其中?

听起来这里的问题已经解决:https://github.com/spring-projects/spring-integration-aws/issues/84

您需要考虑使用最新版本(2.0.0.M2(,甚至更好的2.0.0.BUILD-SNAPSHOT

最新更新