NodeJS Bull 停止作业上的队列作业失败



我的NodeJS项目中有多个公牛队列,如果上一个队列成功执行,它们将运行。 我正在尝试验证这里的一些电子邮件地址。

  1. 检查电子邮件格式(格式队列)

  2. 使用npm email-existence包的电子邮件存在(存在队列)

formatQueue是更少的时间过程,这将运行RegEx并验证Email格式。 但email-existence包大约需要 5-10 秒才能完成。

formatQueue如果像 20-100 这样的jobs较少,existenceQueue可以正常工作。 但是当我一次添加大约 1000 个作业时,existenceQueue失败并出现以下错误

myemail@email.com job stalled more than allowable limit

我在这里和这里检查了这个问题,我认为这个过程需要很长时间才能做出回应,所以添加了limiter,如这里所指。但这对我没有帮助。

如果任何队列中的作业失败,则不会处理下一个作业。它将止步于此,其他工作将留在waiting状态。

我的代码类似于下面的代码。 请帮助我解决问题。

队列.js

var formatQueue = new Queue('format', "redis-db-url");
var existenceQueue = new Queue('existence', "redis-db-url");
// ------------ function for adding to queue ------------
module.exports.addToQueue = (emails) => {
emails.forEach(element => {
formatQueue.add(element, { attempts: 3, backoff: 1000 });
});
}
// ------------ Queue Process -------------
// Format Test Process
formatQueue.process(function(job, done){
FormatTest.validate(job.data, (err, data) => {
if(err) done();
else{
job.data = data;
done();
}
});
});
// Existence Test Process
formatQueue.process(function(job, done){
ExistenceTest.validate(job.data, (err, data) => {
if(err) done();
else{
job.data = data;
done();
}
});
});

// ------------ On Cmplete Handlers ------------
formatQueue.on('completed', function(job){
if(job.data.is_well_format){
existenceQueue.add(job.data, { attempts: 3, backoff: 1000 });
}else QueueModel.lastStep(job.data)
});
existenceQueue.on('completed', function(job){
QueueModel.lastStep(job.data)
});

// ------------ To update the emaile ------------
module.exports.lastStep = (data) => {
Emails.updateEmail(data, (err, updated) => {
if(!err) {
formatQueue.clean('completed');
existenceQueue.clean('completed');
}
})
}

---------更新---------

处理器需要太多时间来响应,因此由于我使用超时,作业变得stalled或失败。

我正在尝试在不同的processor文件中运行process,因为它在bull文档中,我添加了如下文件。

// -------- Queue.js ----------
formatQueue.process(__dirname+"/processors/format-worker.js");

// On Cmplete Handler
formatQueue.on('completed', function(job, result){
console.log(result, "Format-Complete-job"); // result is undefined
if(job.data.is_well_format){
existenceQueue.add(job.data, { attempts: 3, backoff: 1000 });
}else QueueModel.lastStep(job.data)
});
// -------- Queue.js ends ---------
//format-worker.js
Validator = require("../../validators");
module.exports = (job) => {
Validator.Format.validate(job.data, (data) => {
job.data = data;
return Promise.resolve(data);
});
}

现在,在我之前使用的作业完成时,我曾经使用更新的作业参数获取作业数据。现在我没有获得更新的工作数据。文档中的第二个参数,即resultundefined。 现在,在这种情况下,我如何获取更新的作业数据。

尝试可重复的作业

var formatQueue = new Queue('format', "redis-db-url");
var existenceQueue = new Queue('existence', "redis-db-url");
// ------------ function for adding to queue ------------
module.exports.addToQueue = (emails) => {
emails.forEach(element => {
let jobOptions = {
repeat: {
every: 10 * 1000, // Run job every 10 seconds for example
limit: 3 // Maximum number of times a job can repeat.
},
jobId: someUniqueId, // important do not forget this
removeOnComplete: true, // removes job from queue on success (if required)
removeOnFail: true // removes job from queue on failure (if required)
}
formatQueue.add(element, jobOptions);
});
}
// ------------ Queue Process -------------
// Format Test Process
formatQueue.process(function (job, done) {
FormatTest.validate(job.data, (err, data) => {
if (err) {
// Done with error
done(true);
} else {
job.data = data;
// Done without any error
done(false);
}
});
});
// Existence Test Process
existenceQueue.process(function (job, done) {
ExistenceTest.validate(job.data, (err, data) => {
if (err) {
// Done with error
done(true);
} else {
job.data = data;
// Done without any error
done(false);
}
});
});

// ------------ On Complete Handlers ------------
formatQueue.on('completed', function (job) {
if (job.data.is_well_format) {
let jobOptions = {
repeat: {
every: 10 * 1000, // Run job every 10 seconds for example
limit: 3 // Maximum number of times a job can repeat.
},
jobId: someUniqueId, // important do not forget this
removeOnComplete: true, // removes job from queue on success (if required)
removeOnFail: true // removes job from queue on failure (if required)
}
existenceQueue.add(job.data, jobOptions);
} else QueueModel.lastStep(job.data)
});
existenceQueue.on('completed', function (job) {
QueueModel.lastStep(job.data)
});

// ------------ To update the email ------------
module.exports.lastStep = (data) => {
Emails.updateEmail(data, (err, updated) => {
if (!err) {
formatQueue.clean('completed');
existenceQueue.clean('completed');
}
})
}

相关内容

  • 没有找到相关文章

最新更新