亚马逊网络服务-AWS SQS Java.并非所有消息都从SQS队列中检索



我一直在尝试几种方法,通过使用AWS SDK for Java从SQS队列中检索所有消息,但都没有成功。我已经阅读了有关AWS SQS的分布式性质,以及消息存储在不同的服务器上。但我不明白的是,为什么这个架构没有向最终用户隐藏。我必须在Java代码中应用哪些技巧才能检索所有消息并100%确保没有遗漏任何消息?

我在"长轮询"中尝试过

ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl);
List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
for (Message message : messages) {
System.out.println(" Message");
System.out.println(" MessageId: " + message.getMessageId());
System.out.println(" ReceiptHandle: " + message.getReceiptHandle());
System.out.println(" MD5OfBody: " + message.getMD5OfBody());
System.out.println(" Body: " + message.getBody());
for (Entry<String, String> entry : message.getAttributes().entrySet()) {
System.out.println(" Attribute");
System.out.println(" Name: " + entry.getKey());
System.out.println(" Value: " + entry.getValue());
}
}
System.out.println();

这与请求批处理/客户端缓冲有关:

    // Create the basic Amazon SQS async client
    AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient();
    // Create the buffered client
    AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync);
    CreateQueueRequest createRequest = new CreateQueueRequest().withQueueName("MyTestQueue");
    CreateQueueResult res = bufferedSqs.createQueue(createRequest);
    SendMessageRequest request = new SendMessageRequest();
    String body = "test message_" + System.currentTimeMillis();
    request.setMessageBody( body );
    request.setQueueUrl(res.getQueueUrl());
    SendMessageResult sendResult = bufferedSqs.sendMessage(request);
    ReceiveMessageRequest receiveRq = new ReceiveMessageRequest()
    .withMaxNumberOfMessages(10)
    .withQueueUrl(queueUrl);
    ReceiveMessageResult rx = bufferedSqs.receiveMessage(receiveRq);
    List<Message> messages = rx.getMessages();
    for (Message message : messages) {
    System.out.println(" Message");
    System.out.println(" MessageId: " + message.getMessageId());
    System.out.println(" ReceiptHandle: " + message.getReceiptHandle());
    System.out.println(" MD5OfBody: " + message.getMD5OfBody());
    System.out.println(" Body: " + message.getBody());
    for (Entry<String, String> entry : message.getAttributes().entrySet()) {
    System.out.println(" Attribute");
    System.out.println(" Name: " + entry.getKey());
    System.out.println(" Value: " + entry.getValue());
    }
    }

但我仍然无法检索到所有消息。

知道吗?

AWS论坛对我的帖子保持沉默。

从SQS队列接收消息时,需要重复调用sqs:ReceiveMessage

在每次调用sqs:ReceiveMessage时,您将从队列中获得0条或更多需要迭代的消息。对于每条消息,在处理完每条消息后,还需要调用sqs:DeleteMessage将消息从队列中删除。

在上面的"长轮询"示例周围添加一个循环,以接收所有消息。

for (;;) {
    ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl);
    List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
    for (Message message : messages) {
        System.out.println(" Message");
        System.out.println(" MessageId: " + message.getMessageId());
        System.out.println(" ReceiptHandle: " + message.getReceiptHandle());
        System.out.println(" MD5OfBody: " + message.getMD5OfBody());
        System.out.println(" Body: " + message.getBody());
        for (Entry<String, String> entry : message.getAttributes().entrySet()) {
            System.out.println(" Attribute");
            System.out.println(" Name: " + entry.getKey());
            System.out.println(" Value: " + entry.getValue());
        }
    }
    System.out.println();
}

还要注意,您可能会多次收到同一条信息。因此,允许您的工作"重新处理"相同的消息,或者检测重复的消息。

我也面临同样的问题——只返回了一条消息,然后我尝试了receiveMessageRequest.setMaxNumberOfMessages(10),这将帮助我在一个循环中检索10条消息,

因为我的队列有500多条记录,所以我做的是

    List<String> messagelist = new ArrayList<>();
    try
    {
        AmazonSQS sqs = new AmazonSQSClient(credentials);
        Region usWest2 = Region.getRegion(Regions.US_WEST_2);
        sqs.setRegion(usWest2);
        boolean flag = true;
        while(flag)
        {
            ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queuename);
            receiveMessageRequest.setMaxNumberOfMessages(number_of_message_);
            receiveMessageRequest.withMaxNumberOfMessages(number_of_message_).withWaitTimeSeconds(wait_time_second_);
            List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
            for (Message message : messages) 
                  {
                    //   System.out.println("    Body:          " + message.getBody());
                       messagelist.add( message.getBody());
                       String messageReceiptHandle = message.getReceiptHandle();
                       sqs.deleteMessage(new DeleteMessageRequest().withQueueUrl(queuename).withReceiptHandle(messageReceiptHandle));
                    }
            if(messages.size()==0)
            {
                flag = false;
            }
        }         
    }
    catch (AmazonServiceException ase) {
        ase.printStackTrace();
    } catch (AmazonClientException ace) {
       ace.printStackTrace();
    }
    finally {
        return messagelist ;
    }

我正在从SQS读取记录,然后将其保存到字符串列表中,然后从队列中删除该记录。

所以最后我将把队列中的所有数据都放在一个列表

SQS队列不是数据库。您无法像尝试的那样将所有消息读取到列表中。队列没有开始也没有结束。您轮询队列并询问一些消息,它会返回一些消息(如果存在)。

如果你想要一个可以返回整个数据集的方法,那么sqs不是合适的工具——在这种情况下,传统的数据库可能会更好。

如果队列中没有消息,将等待长轮询。这意味着,如果您在循环中使用长轮询来调用ReceiveMessage,则可以保证您将获得所有消息。如果收到0封回复邮件,则表示您已收到所有邮件。

您提到您还使用了网络控制台。Web控制台的工作方式和使用SDK调用API相同。这意味着,当您在控制台中接收和看到消息时,在可见性超时到期之前,消息对其他客户端是不可见的。这可能就是你看不到消息的原因。

查看有关可见性超时的更多信息:

http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/AboutVT.html

最新更新