我试图在BullMQ上创建一个结果队列,其中所有工作人员都可以发送添加具有特制jobIDs的工作的结果。其思想是,所有结果都生成一个特定的jobID,以便我确切地知道结果是哪个进程的。
我尝试过文档中描述的getNextJob
,没有运气。
我发现的解决方案是使用queuEvents
:每个进程在结果队列的waiting
状态上注册一个侦听器,当具有它需要的id的作业到达时,该进程获得具有getJob
的作业,读取结果数据并尝试将作业移动到完成。它可以工作,我可以正确地获取工人产生的结果。
我遇到的问题是将结果作业移动到已完成状态,因为我无法使用getJob
配置锁令牌,并且我收到Missing lock for job
错误,并且作业保持在活动状态。
这是我在进程
上使用的(伪)代码
const jobID = "THE_ID_OF_THE_JOB_I_AM_WAITING_FOR";
const token = `${jobID}_results_worker`;
const queueEvents = new QueueEvents('results');
const resQueue = this.queues.get('results');
// I define a callback function to be able to remove the listener
const waitResult = async (job: {jobId: any}) => {
if (job.jobId === jobID){
debug(`Result job for ${jobID} received!`);
const resJob = await resQueue?.getJob(jobID) as Job;
queueEvents.removeListener('waiting', waitResult);
// THIS GENERATES the error
resJob?.moveToCompleted('Results received', token, false);
resolve(resJob?.data);
}
}
// Register the callback function on the queue
const listener = queueEvents?.on('waiting', waitResult );
有人知道如何正确处理moveToCompleted
吗?
您可以开发一个results
队列,像这样:const queue_Results = new Queue('Results');
,从那里您可以有一个worker处理事件,像这样const worker_Results = new Worker('Results', async (job: Job) => { // do something with the results from other jobs })
关于该方法的BullMQ文档,在这里