SQS侦听器@headers获取body内容而不是消息属性



我正在使用Spring Cloud SQS消息传递来聆听指定的队列。因此,使用@sqslistener注释如下:

    @SqsListener(value = "${QUEUE}", deletionPolicy = SqsMessageDeletionPolicy.ALWAYS )
    public void receive(@Headers Map<String, String> header, @Payload String message)  {
        try {
            logger.logInfo("Message payload is: "+message);
            logger.logInfo("Header from SQS is: "+header);
            if(<Some condition>){
                //Dequeue the message once message is processed successfully
                awsSQSAsync.deleteMessage(header.get(LOOKUP_DESTINATION), header.get(RECEIPT_HANDLE));
            }else{
                logger.logInfo("Message with header: " + header + " FAILED to process");
                logger.logError(FLEX_TH_SQS001);
            }
        } catch (Exception e) {
            logger.logError(FLEX_TH_SQS001, e);
        }       
    }

我能够成功连接指定的队列并阅读消息。在发送消息之前,我将消息属性设置为" key1" =" value1"以及AWS控制台中的消息。以下是消息主体:

{
"service": "ecsservice"
}

我希望"标题"能够收到所有消息属性的地图以及一个(即key1 and value1(。但是我收到的是:{service = ecsservice}作为填充的地图。

这意味着有效载荷/消息的主体正在成为标头的一部分,尽管身体正常。

我想知道我在做什么错误,@header标头无法获得正确的消息属性。

寻求专家建议。

-pc

我在我的一个春季项目中遇到了相同的问题。对我来说,问题是,QueueMessageHandlerFactory的SQS配置与设置setArgumentResolvers

默认情况下,弹簧中的第一个参数解析器是PayloadArgumentResolver。以下行为

@Override
    public boolean supportsParameter(MethodParameter parameter) {
        return (parameter.hasParameterAnnotation(Payload.class) || this.useDefaultResolution);
    }

在这里,默认情况下,this.useDefaultResolution设置为true - 这意味着任何参数都可以转换为有效载荷。

和Spring试图将您的方法与一个解析器之一匹配,(首先是PayloadArgumentResolver( - 的确,它将尝试将所有参数转换为Payload

春季的源代码:

@Nullable
    private HandlerMethodArgumentResolver getArgumentResolver(MethodParameter parameter) {
        HandlerMethodArgumentResolver result = this.argumentResolverCache.get(parameter);
        if (result == null) {
            for (HandlerMethodArgumentResolver resolver : this.argumentResolvers) {
                if (resolver.supportsParameter(parameter)) {
                    result = resolver;
                    this.argumentResolverCache.put(parameter, result);
                    break;
                }
            }
        }
        return result;
    }

我如何解决这个问题,

弹簧解析器的默认行为

factory.setArgumentResolvers(
            listOf(
                new PayloadArgumentResolver(converter, null, false),
                new HeaderMethodArgumentResolver(null, null)
            )
        )

我设置的位置,默认标志为false和spring只有在参数上有注释时才尝试转换为有效载荷。

希望这会有所帮助。

除了@sqslistener,您需要将@messagemagping添加到该方法中。此注释将有助于解决方法参数。

我有这个问题是在相当大的代码库中工作的。事实证明,正在将HandlethodargumentResolver添加到用于基本上用来将消息解析到参数中的解析器列表中。就我而言,这是PayloadArgumentResolver,通常总是将论点解析为有效载荷,而不管注释如何。默认情况下,它应该在列表中最后一次,但是由于我不知道的代码,它最终被添加到前面。

无论如何,如果您不确定要查看您的代码,看看您是否对Spring的QueueMessageHandler或HandlethodargumentResolver做任何事情。

它帮助我使用了调试器并查看handlermethodargumentresolver.resolveargument方法开始追踪发生的事情。

P.S。我认为您的@sqslistener代码看起来不错,除了我认为@headers应该从技术上解析为Lt;字符串,对象>",但我不确定这会导致您看到的问题。

创建queuemessagingTemplate和杰克逊到消息转换器bean: -

@Bean
QueueMessagingTemplate queueMessagingTemplate() {
     return new QueueMessagingTemplate(AmazonSQSAsyncClientBuilder.standard().withRegion("ap-south-1").build());
}
@Bean
MappingJackson2MessageConverter messageConverter() {
     return new MappingJackson2MessageConverter();
}

最新更新