当我在不同的函数上调用getJob时,我需要一种方法来阻止worker处理作业。我环顾四周,但找不到解决办法。
我有以下设置。在带有express的nodeJS中,我有worker节点。
- 使用延迟状态创建的作业
- 正在以不同的功能访问作业
async function jobReader(id) {
const job = await queue.getJob(id);
/* do some stuff */
await job.remove();
}
- 独立处理作业的Worker节点。只有在延迟时间结束后,才会处理作业
queue.process(async (job) => {
/* do some stuff */
})
queue.getJob(id)
不会阻止工人处理作业。因此,工人处理工作和jobReader处理工作之间存在竞争。我正在根据工作状态将一些结果写入DB。因此,比赛条件是不可接受的。
显然,getJob并没有阻止工人处理作业。如果作业是由其他具有getJob函数的函数读取的,是否有任何方法可以锁定或阻止该作业上的工人工作。
如有任何帮助或文件,我们将不胜感激。
感谢
我想您应该稍微更改一下您的体系结构。WorkerNode
完全按照它的目的执行,它接受jobs
并运行它们。因此,您不应该以某种方式阻止queue
,而应该只在用户批准/取消/失败时(或在120秒后没有发送响应(将job
添加到queue
。
如果我理解得对,这应该会让你知道如何控制不同请求之间的作业:
// this is YOUR queueu object. I don't now implentation but think
// of it like this..
const queue = new Queue()
// a variable holding the pending jobs which are not timeouted
// or explicitly approved/canceled/failed by user
const waitingJobs = {
}
// This could be your location where the user calls the api for creating a job.
app.post('/job', (req, res) => {
// create the job as the user requested it
const job = createJob(req)
// Add a timeout for 120 seconds into your waitingJobs array.
// So if the user does not respond after that time, the job will
// be added to queue! .
const timeout = setTimeout(() => {
queue.add(job)
// remove the reference after adding, garbage collection..
waitingJobs[job.id] = null
// job is added to queue automatically after 120 seconds
}, 120 * 1000)
// store the timeout in the job object!
job.timeout = timeout
// store the waiting job!
waitingJobs[job.id] = job
// respond to user, send back id so client can do another
// request if wanted.
req.status(200).json({ message: 'Job created!', id: job.id })
})
app.post('/job/:id', (req, res) => {
const id = req.params.id
if (!id) {
req.status(400).json('bad job id provided')
return
}
// get the queued job:
const job = waitingJobs[id]
if (!job) {
req.status(400).json('Job nod found OR job already processed. Job id: ' + id)
return
}
// now the user responded to a specific job, clean the
// timeout first, so it won't be added to queue!
if (job.timeout) {
clearTimeout(job.timeout)
}
// Now the job won't be processed somewhere else!
// you can do whatever you want...
// example:
// get the action
const action = req.query.action
if(!action) {
res.status(400).json('Bad action provided: ' + action)
return
}
if(action === 'APPROVE') {
// job approved! , add it to queue so worker node
// can process it..
queue.add(job)
}
if(action === 'CANCEL') {
// do something else...
}
/// etc..
// ofc clear the job reference after you did something..
waitingJobs[job.id] = null
// since everything worked, inform user the job will now be processed!
res.status(200).json('Job ' + job.id + 'Will now be processed')
})