我在生产者-消费者模型中使用BullMQ、Redis和MySQL来处理作业。
我有一个看起来像这样的制作人:
const jobOptions = {
removeOnComplete: true, // remove job if complete
delay: 60000,
attempts: 3
};
const sasWorkerProducerService = ({redisConnection}) => {
const queueLogId = async (jobId, logIds) => {
for(const logId of logIds) {
redisConnection.add({
jobId: jobId,
scraperPriceLogId: logId
}, jobOptions);
}
}
return {
queueLogId
}
}
module.exports = sasWorkerProducerService;
我有一个工人服务来处理这些工作:
const Bull = require('bull');
const connectQueue = (name) => new Bull(name, {
redis: {
port: 6379, host: 'xxxx', password: 'xxxx'
},
settings: {
maxStalledCount: 5
}
})
module.exports = { connectQueue }
const nameQueue = 'sas-worker'
const cases = await connectQueue(nameQueue)
const initJob = () => {
console.info('job is working!');
cases.process('__default__', 300, processJob);
cases.on('failed', handlerFailure);
cases.on('completed', handlerCompleted);
cases.on('stalled', handlerStalled);
}
initJob()
请注意,生产者发送了一个jobId
作为有效负载的一部分。这个ID是我生成并存储在MySQL数据库中的标识符。作业表示需要处理的一批项目。我不在乎一批作业的完成顺序。
但是,一旦给定jobId
的所有作业都已完成,我如何确定?在所有的工作都处理好之后,我需要做一些工作。
我知道生产者-消费者模型的本质是对一件商品进行处理,然后忘记它,但在所有商品都经过处理后,我如何才能为一件商品做一些最终的后处理工作?
你可以在你的消费者身上做类似的事情(在伪代码中(:
async function processJob(bullJob) {
// Process the bullJob
// Atomically update job by jobId in MySQL, decrease number of items left to process
// Select job from MySQL by jobSid to see if any items left
// If yes - react somehow
// If not - exit
}