Bullmq从队列中获取具有特定ID的作业



我试图在BullMQ上创建一个结果队列,其中所有工作人员都可以发送添加具有特制jobIDs的工作的结果。其思想是,所有结果都生成一个特定的jobID,以便我确切地知道结果是哪个进程的。

我尝试过文档中描述的getNextJob,没有运气。

我发现的解决方案是使用queuEvents:每个进程在结果队列的waiting状态上注册一个侦听器,当具有它需要的id的作业到达时,该进程获得具有getJob的作业,读取结果数据并尝试将作业移动到完成。它可以工作,我可以正确地获取工人产生的结果。

我遇到的问题是将结果作业移动到已完成状态,因为我无法使用getJob配置锁令牌,并且我收到Missing lock for job错误,并且作业保持在活动状态。

这是我在进程

上使用的(伪)代码

const jobID = "THE_ID_OF_THE_JOB_I_AM_WAITING_FOR";
const token = `${jobID}_results_worker`;
const queueEvents = new QueueEvents('results');
const resQueue = this.queues.get('results');
// I define a callback function to be able to remove the listener
const waitResult = async (job: {jobId: any}) => {
if (job.jobId === jobID){
debug(`Result job for ${jobID} received!`);
const resJob = await resQueue?.getJob(jobID) as Job;
queueEvents.removeListener('waiting', waitResult);
// THIS GENERATES the error 
resJob?.moveToCompleted('Results received', token, false);
resolve(resJob?.data);
}
} 
// Register the callback function on the queue
const listener = queueEvents?.on('waiting', waitResult );

有人知道如何正确处理moveToCompleted吗?

您可以开发一个results队列,像这样:const queue_Results = new Queue('Results');,从那里您可以有一个worker处理事件,像这样const worker_Results = new Worker('Results', async (job: Job) => { // do something with the results from other jobs })

关于该方法的BullMQ文档,在这里

相关内容

  • 没有找到相关文章

最新更新