从BullMQ完成的作业中检索结果的推荐方法是什么



我使用BullMQ和express服务器异步处理作业,但对如何从已完成的作业中检索结果感到困惑。

我目前正在做的是侦听作业完成状态事件,并将这些结果存储在以作业Id为关键字的对象中,并在需要时从该对象检索结果。有推荐的方法吗?

我查看了BullMQ文档,但找不到任何关于如何检索结果的信息。

这是示例代码:

server.js

// Kick off a new job by adding it to the work queue
app.post("/api/submitjob", async (req, res) => {
let job = await workQueue.add();
res.json({ id: job.id });
});
app.get("/api/jobstatus/:id", async (req, res) => {
let id = req.params.id;
let job = await workQueue.getJob(id);
if (job === null) {
res.status(404).end();
} else {
let state = await job.getState();
let reason = job.failedReason;
res.json({ id, state, progress, reason, result: jobIdResultMap[id] });
}
});
// You can listen to global events to get notified when jobs are processed
workQueue.on('global:completed', (jobId, result) => {
logger.log('info', `${jobId} succesfully completed`);
jobIdResultMap[jobId] = JSON.parse(result);
});
app.listen(PORT, () => console.log(`✅  API Server started: http://${HOST}:${PORT}/api/v1/endpoint`));

worker.js:

let throng = require("throng");
let Queue = require("bull");
// Connect to a local redis instance locally, and the Heroku-provided URL in production
let REDIS_URL = process.env.REDIS_URL || "redis://127.0.0.1:6379";
// Spin up multiple processes to handle jobs to take advantage of more CPU cores
// See: https://devcenter.heroku.com/articles/node-concurrency for more info
let workers = process.env.WEB_CONCURRENCY || 2;
// The maximum number of jobs each worker should process at once. This will need
// to be tuned for your application. If each job is mostly waiting on network
// responses it can be much higher. If each job is CPU-intensive, it might need
// to be much lower.
let maxJobsPerWorker = 50;
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
function start() {
// Connect to the named work queue
let workQueue = new Queue("work", REDIS_URL);
workQueue.process(maxJobsPerWorker, async (job) => {
// This is an example job that just slowly reports on progress
// while doing no work. Replace this with your own job logic.
let progress = 0;
await sleep(50);
// A job can return values that will be stored in Redis as JSON
// This return value is unused in this demo application.
return { value: "This will be stored" };
});
}
// Initialize the clustered worker process
// See: https://devcenter.heroku.com/articles/node-concurrency for more info
throng({ workers, start });

建议使用作业队列和消息队列。

在编写BullMQ的文档时,它是不完整的,因此您应该看看Bull的文档。

来自文件-

返回作业完成

一种常见的模式是,您有一个队列处理器集群,它们只会以最快的速度处理作业,还有一些其他服务需要获取这些处理器的结果并对其进行处理,可能会将结果存储在数据库中。

实现这一点的最稳健和可扩展的方法是将标准作业队列与消息队列模式相结合:服务只需打开作业队列并向其中添加作业,就可以向集群发送作业,集群将以最快的速度开始处理。每当集群中的作业完成时,都会向结果消息队列发送一条消息,其中包含结果数据,该队列将由其他服务侦听,该服务将结果存储在数据库中。

最新更新