如何在现有的nodejs服务器上使用sqs消息



我想在SQS收到新消息时接收并触发电子邮件,现在我已经有nodejs服务器在运行,我该如何让它工作?我真的不想触发那个函数。但我希望当SQS中有新消息时,这个消费者会消费并执行发送电子邮件的业务逻辑。

但我的职能没有得到任何触发。注意:我不是在调用这个函数,我希望它在SQS有新消息时自动触发。

const AWS = require('aws-sdk');
const mongoose = require('mongoose');
//
// Configure the aws details
//
AWS.config.update({
region: process.env['AWS_REGION'],
accessKeyId: process.env['AWS_ACCESS_KEY_ID'],
secretAccessKey: process.env['AWS_SECRET_ACCESS_KEY']
});

const sqs = new AWS.SQS({apiVersion: '2012-11-05'});
var queueURL = "https://sqs.us-east-1.amazonaws.com/xxxxx/demo-lambda-to-email-sqs"

var params = {
AttributeNames: [
"SentTimestamp"
],
MaxNumberOfMessages: 1,
MessageAttributeNames: [
"All"
],
QueueUrl: queueURL,
VisibilityTimeout: 20,
WaitTimeSeconds: 0
};
sqs.receiveMessage(params, function(err, data) {
if (err) {
console.log("Receive Error", err);
} else if (data.Messages) {
console.log('--------------------------- MESSAGE RECEIVED -------------')
var deleteParams = {
QueueUrl: queueURL,
ReceiptHandle: data.Messages[0].ReceiptHandle
};
sqs.deleteMessage(deleteParams, function(err, data) {
if (err) {
console.log("Delete Error", err);
} else {
console.log("Message Deleted", data);
}
});
}
});

SQS是一个排队服务,因此它需要通过基于拉的机制而不是基于推的机制来使用。

只有当您具有轮询SQS队列的功能,然后在收到消息时触发该功能时,才能调用该功能。

如果您不想维护使用者脚本,您应该考虑将此脚本迁移到Lambda函数中。使用此选项时,Lambda服务将充当队列的使用者,并仅在添加消息时触发Lambda函数。

有关在SQS队列中使用AWS Lambda的更多信息,请参阅文档。

您只需要调用一次,而不需要使用长轮询。所以它开始了,得到了一个空的响应。因此,您需要一个基于拉的机制,一个基本的实现可以在SetInterval中运行receiveMessage。类似于:

setInterval(function() {
sqs.receiveMessage(params, function(err, data){
// your logic here!
});
}, 10000);
// Run every 10s

大于1s的WaitTimeSeconds可以进行长轮询,并通过消除空响应的数量来帮助降低使用Amazon SQS的成本。

以下代码每30秒调用一次SQS,请求消息。每次通话最多等待20秒才能接收信息。

const AWS = require('aws-sdk')
AWS.config.update({
region: 'us-east-1',
accessKeyId: '...',
secretAccessKey: '...'
})
const sqs = new AWS.SQS()
receiveMessage = () => sqs.receiveMessage({
QueueUrl: 'https://sqs.us-east-1.amazonaws.com/.../...',
WaitTimeSeconds: 20
}, (error, data) => {
if (error) console.error("ERROR:", error)
if (data.Messages) data.Messages.forEach(m => console.info(m.Body))
})
setInterval(receiveMessage, 30000)

最新更新