在获取不同函数上的作业时,公牛队列阻塞作业



当我在不同的函数上调用getJob时,我需要一种方法来阻止worker处理作业。我环顾四周,但找不到解决办法。

我有以下设置。在带有express的nodeJS中,我有worker节点。

  1. 使用延迟状态创建的作业
  2. 正在以不同的功能访问作业
async function jobReader(id) {
const job = await queue.getJob(id);
/* do some stuff */
await job.remove();
}
  1. 独立处理作业的Worker节点。只有在延迟时间结束后,才会处理作业
queue.process(async (job) => {
/* do some stuff */
})

queue.getJob(id)不会阻止工人处理作业。因此,工人处理工作和jobReader处理工作之间存在竞争。我正在根据工作状态将一些结果写入DB。因此,比赛条件是不可接受的。

显然,getJob并没有阻止工人处理作业。如果作业是由其他具有getJob函数的函数读取的,是否有任何方法可以锁定或阻止该作业上的工人工作。

如有任何帮助或文件,我们将不胜感激。

感谢

我想您应该稍微更改一下您的体系结构。WorkerNode完全按照它的目的执行,它接受jobs并运行它们。因此,您不应该以某种方式阻止queue,而应该只在用户批准/取消/失败时(或在120秒后没有发送响应(将job添加到queue

如果我理解得对,这应该会让你知道如何控制不同请求之间的作业:

// this is YOUR queueu object. I don't now implentation but think 
// of it like this..
const queue = new Queue()
// a variable holding the pending jobs which are not timeouted
// or explicitly approved/canceled/failed by user
const waitingJobs = {
}

// This could be your location where the user calls the api for creating a job.
app.post('/job', (req, res) => {
// create the job as the user requested it
const job = createJob(req)
// Add a timeout for 120 seconds into your waitingJobs array. 
// So if the user does not respond after that time, the job will 
// be added to queue! .
const timeout = setTimeout(() => {
queue.add(job)
// remove the reference after adding, garbage collection..
waitingJobs[job.id] = null
// job is added to queue automatically after 120 seconds
}, 120 * 1000)
// store the timeout in the job object!
job.timeout = timeout
// store the waiting job!
waitingJobs[job.id] = job
// respond to user, send back id so client can do another 
// request if wanted.
req.status(200).json({ message: 'Job created!', id: job.id })
})

app.post('/job/:id', (req, res) => {
const id = req.params.id
if (!id) {
req.status(400).json('bad job id provided')
return
}
// get the queued job:
const job = waitingJobs[id]
if (!job) {
req.status(400).json('Job nod found OR job already processed. Job id: ' + id)
return
}
// now the user responded to a specific job, clean the 
// timeout first, so it won't be added to queue!
if (job.timeout) {
clearTimeout(job.timeout)
}
// Now the job won't be processed somewhere else!
// you can do whatever you want...
// example:
// get the action
const action = req.query.action

if(!action) {
res.status(400).json('Bad action provided: ' + action)
return
}
if(action === 'APPROVE') {
// job approved! , add it to queue so worker node 
// can process it..
queue.add(job)
}
if(action === 'CANCEL') {
// do something else...
}
/// etc..

// ofc clear the job reference after you did something..
waitingJobs[job.id] = null

// since everything worked, inform user the job will now be processed!
res.status(200).json('Job ' + job.id + 'Will now be processed')
})

最新更新