无法在 Kinesis Binder 中将消息作为批处理模式使用



我正在尝试批量使用来自 Kinesis 流的消息

我正在使用

compile('org.springframework.cloud:spring-cloud-starter-stream-kinesis:1.0.0.BUILD-SNAPSHOT')

应用.yml

spring: 
  cloud:
    stream:
      bindings:
        input: 
          group: groupName
          destination: stream-name
          content-type: application/json
          consumer: 
           listenerMode: batch
           idleBetweenPolls: 10000

法典

根据文档,当侦听器模式是批处理时,它应该将列表作为有效负载

@StreamListener(Sink.INPUT)
public void message(List<String> messages) {
    System.out.println(messages);
}

我已发送消息{"ab":"aa"}

我得到消息错误,因为

org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot parse payload ; nested exception is com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize instance of `java.util.ArrayList` out of START_OBJECT token
 at [Source: (byte[])"{"ab":"aa"}"; line: 1, column: 1]
    at org.springframework.integration.dispatcher.AbstractDispatcher.wrapExceptionIfNecessary(AbstractDispatcher.java:133) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:120) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:463) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:407) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter.access$4000(KinesisMessageDrivenChannelAdapter.java:86) ~[spring-integration-aws-2.0.0.BUILD-SNAPSHOT.jar:na]
    at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.performSend(KinesisMessageDrivenChannelAdapter.java:1010) [spring-integration-aws-2.0.0.BUILD-SNAPSHOT.jar:na]
    at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.processRecords(KinesisMessageDrivenChannelAdapter.java:980) [spring-integration-aws-2.0.0.BUILD-SNAPSHOT.jar:na]
    at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.lambda$processTask$1(KinesisMessageDrivenChannelAdapter.java:859) [spring-integration-aws-2.0.0.BUILD-SNAPSHOT.jar:na]
    at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ConsumerInvoker.run(KinesisMessageDrivenChannelAdapter.java:1078) ~[spring-integration-aws-2.0.0.BUILD-SNAPSHOT.jar:na]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_144]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_144]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_144]
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot parse payload ; nested exception is com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize instance of `java.util.ArrayList` out of START_OBJECT token
 at [Source: (byte[])"{"ab":"aa"}"; line: 1, column: 1]
    at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertParameterizedType(ApplicationJsonMessageMarshallingConverter.java:124) ~[spring-cloud-stream-2.0.0.RELEASE.jar:2.0.0.RELEASE]
    at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertFromInternal(ApplicationJsonMessageMarshallingConverter.java:85) ~[spring-cloud-stream-2.0.0.RELEASE.jar:2.0.0.RELEASE]
    at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:181) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.messaging.converter.CompositeMessageConverter.fromMessage(CompositeMessageConverter.java:70) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:137) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:116) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:137) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:109) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55) ~[spring-cloud-stream-2.0.0.RELEASE.jar:2.0.0.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:157) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    ... 18 common frames omitted
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize instance of `java.util.ArrayList` out of START_OBJECT token
 at [Source: (byte[])"{"ab":"aa"}"; line: 1, column: 1]
    at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:63) ~[jackson-databind-2.9.4.jar:2.9.4]
    at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1342) ~[jackson-databind-2.9.4.jar:2.9.4]
    at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1138) ~[jackson-databind-2.9.4.jar:2.9.4]
    at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1092) ~[jackson-databind-2.9.4.jar:2.9.4]
    at com.fasterxml.jackson.databind.deser.std.StringCollectionDeserializer.handleNonArray(StringCollectionDeserializer.java:266) ~[jackson-databind-2.9.4.jar:2.9.4]
    at com.fasterxml.jackson.databind.deser.std.StringCollectionDeserializer.deserialize(StringCollectionDeserializer.java:179) ~[jackson-databind-2.9.4.jar:2.9.4]
    at com.fasterxml.jackson.databind.deser.std.StringCollectionDeserializer.deserialize(StringCollectionDeserializer.java:169) ~[jackson-databind-2.9.4.jar:2.9.4]
    at com.fasterxml.jackson.databind.deser.std.StringCollectionDeserializer.deserialize(StringCollectionDeserializer.java:21) ~[jackson-databind-2.9.4.jar:2.9.4]
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4001) ~[jackson-databind-2.9.4.jar:2.9.4]
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3109) ~[jackson-databind-2.9.4.jar:2.9.4]
    at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertParameterizedType(ApplicationJsonMessageMarshallingConverter.java:114) ~[spring-cloud-stream-2.0.0.RELEASE.jar:2.0.0.RELEASE]
    ... 29 common frames omitted

您误读了文档;由于这是特定于 kinesis 的属性,因此您需要在 YAML 中使用 kinesis: 元素。

https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/master/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc#kinesis-consumer-properties

以下属性仅适用于 Kinesis 使用者,并且必须以 spring.cloud.stream.kinesis.bindings.<channelName>.consumer. 为前缀

最新更新