我一直在尝试几种方法,通过使用AWS SDK for Java从SQS队列中检索所有消息,但都没有成功。我已经阅读了有关AWS SQS的分布式性质,以及消息存储在不同的服务器上。但我不明白的是,为什么这个架构没有向最终用户隐藏。我必须在Java代码中应用哪些技巧才能检索所有消息并100%确保没有遗漏任何消息?
我在"长轮询"中尝试过:
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl);
List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
for (Message message : messages) {
System.out.println(" Message");
System.out.println(" MessageId: " + message.getMessageId());
System.out.println(" ReceiptHandle: " + message.getReceiptHandle());
System.out.println(" MD5OfBody: " + message.getMD5OfBody());
System.out.println(" Body: " + message.getBody());
for (Entry<String, String> entry : message.getAttributes().entrySet()) {
System.out.println(" Attribute");
System.out.println(" Name: " + entry.getKey());
System.out.println(" Value: " + entry.getValue());
}
}
System.out.println();
这与请求批处理/客户端缓冲有关:
// Create the basic Amazon SQS async client
AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient();
// Create the buffered client
AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync);
CreateQueueRequest createRequest = new CreateQueueRequest().withQueueName("MyTestQueue");
CreateQueueResult res = bufferedSqs.createQueue(createRequest);
SendMessageRequest request = new SendMessageRequest();
String body = "test message_" + System.currentTimeMillis();
request.setMessageBody( body );
request.setQueueUrl(res.getQueueUrl());
SendMessageResult sendResult = bufferedSqs.sendMessage(request);
ReceiveMessageRequest receiveRq = new ReceiveMessageRequest()
.withMaxNumberOfMessages(10)
.withQueueUrl(queueUrl);
ReceiveMessageResult rx = bufferedSqs.receiveMessage(receiveRq);
List<Message> messages = rx.getMessages();
for (Message message : messages) {
System.out.println(" Message");
System.out.println(" MessageId: " + message.getMessageId());
System.out.println(" ReceiptHandle: " + message.getReceiptHandle());
System.out.println(" MD5OfBody: " + message.getMD5OfBody());
System.out.println(" Body: " + message.getBody());
for (Entry<String, String> entry : message.getAttributes().entrySet()) {
System.out.println(" Attribute");
System.out.println(" Name: " + entry.getKey());
System.out.println(" Value: " + entry.getValue());
}
}
但我仍然无法检索到所有消息。
知道吗?
AWS论坛对我的帖子保持沉默。
从SQS队列接收消息时,需要重复调用sqs:ReceiveMessage
。
在每次调用sqs:ReceiveMessage
时,您将从队列中获得0条或更多需要迭代的消息。对于每条消息,在处理完每条消息后,还需要调用sqs:DeleteMessage
将消息从队列中删除。
在上面的"长轮询"示例周围添加一个循环,以接收所有消息。
for (;;) {
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl);
List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
for (Message message : messages) {
System.out.println(" Message");
System.out.println(" MessageId: " + message.getMessageId());
System.out.println(" ReceiptHandle: " + message.getReceiptHandle());
System.out.println(" MD5OfBody: " + message.getMD5OfBody());
System.out.println(" Body: " + message.getBody());
for (Entry<String, String> entry : message.getAttributes().entrySet()) {
System.out.println(" Attribute");
System.out.println(" Name: " + entry.getKey());
System.out.println(" Value: " + entry.getValue());
}
}
System.out.println();
}
还要注意,您可能会多次收到同一条信息。因此,允许您的工作"重新处理"相同的消息,或者检测重复的消息。
我也面临同样的问题——只返回了一条消息,然后我尝试了receiveMessageRequest.setMaxNumberOfMessages(10),这将帮助我在一个循环中检索10条消息,
因为我的队列有500多条记录,所以我做的是
List<String> messagelist = new ArrayList<>();
try
{
AmazonSQS sqs = new AmazonSQSClient(credentials);
Region usWest2 = Region.getRegion(Regions.US_WEST_2);
sqs.setRegion(usWest2);
boolean flag = true;
while(flag)
{
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queuename);
receiveMessageRequest.setMaxNumberOfMessages(number_of_message_);
receiveMessageRequest.withMaxNumberOfMessages(number_of_message_).withWaitTimeSeconds(wait_time_second_);
List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
for (Message message : messages)
{
// System.out.println(" Body: " + message.getBody());
messagelist.add( message.getBody());
String messageReceiptHandle = message.getReceiptHandle();
sqs.deleteMessage(new DeleteMessageRequest().withQueueUrl(queuename).withReceiptHandle(messageReceiptHandle));
}
if(messages.size()==0)
{
flag = false;
}
}
}
catch (AmazonServiceException ase) {
ase.printStackTrace();
} catch (AmazonClientException ace) {
ace.printStackTrace();
}
finally {
return messagelist ;
}
我正在从SQS读取记录,然后将其保存到字符串列表中,然后从队列中删除该记录。
所以最后我将把队列中的所有数据都放在一个列表
SQS队列不是数据库。您无法像尝试的那样将所有消息读取到列表中。队列没有开始也没有结束。您轮询队列并询问一些消息,它会返回一些消息(如果存在)。
如果你想要一个可以返回整个数据集的方法,那么sqs不是合适的工具——在这种情况下,传统的数据库可能会更好。
如果队列中没有消息,将等待长轮询。这意味着,如果您在循环中使用长轮询来调用ReceiveMessage,则可以保证您将获得所有消息。如果收到0封回复邮件,则表示您已收到所有邮件。
您提到您还使用了网络控制台。Web控制台的工作方式和使用SDK调用API相同。这意味着,当您在控制台中接收和看到消息时,在可见性超时到期之前,消息对其他客户端是不可见的。这可能就是你看不到消息的原因。
查看有关可见性超时的更多信息:
http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/AboutVT.html