我正试图用Bull框架将DTO发送到Redis队列,并在处理器中处理这些DTO。有时作业会传递到处理器(100个中的1个(,但大多数时候都失败了,并出现错误:job stalled more than allowable limit
和我不知道如何修复它。
我给你一个小介绍,下面你可以看到我的代码。我已经创建了queue-api
模块,它充当我的队列的包装器,例如订单队列。然后,我将该模块导入到要将DTO发布到队列中的模块,在我的例子中是order-module
。
队列api模块文件
// queue-api.module.ts
@Module({
imports: [
BullModule.registerQueue(
{
name: 'order-queue',
defaultJobOptions: {
backoff: 10000,
attempts: Number.MAX_SAFE_INTEGER,
},
},
),
...
],
providers: [OrderQueue],
exports: [OrderQueue],
})
export class QueueApiModule {}
// order-queue.ts
@Injectable()
export class OrderQueue extends AbstractQueue {
constructor(
@InjectQueue('order-queue')
private readonly queue: Queue,
) {}
async sendSubmitMail(dto: SendSubmitMailDto): Promise<void> {
const job = await this.queue.add('send-submit-mail', dto)
console.log(`Job ${job.id} created.`)
}
}
订单模块文件
// order.module.ts
@Module({
imports: [
QueueApiModule,
...
],
providers: [
OrderProcessor,
...
]
})
export class OrderModule {}
// order-processor.ts
@Processor('order-queue')
export class OrderProcessor {
constructor(private readonly queue: OrderQueue) {}
@Process('send-submit-mail')
async onProcessSubmitMail(job: Job): Promise<void> {
console.log(`Processing of job ${job.id}`)
}
}
这个处理器处理程序几乎从未被调用过。
你知道我的代码出了什么问题吗?谢谢你的建议。
我遇到了类似的问题,但还没有深入了解根本原因。但在此期间,我使用了bull-repl-cli来查看队列状态。当发生停滞错误时,之后将不会触发任何作业(看起来队列卡在了失败的作业上(。如果你在公牛回复中运行统计数据,你会发现有一个作业处于活动状态。您可以手动删除它(使用bullrepl(,然后运行下一个作业。我怀疑QueueScheduler没有运行,因此没有处理停滞的作业。您还可以增加停滞超时参数(有2-3个,请检查[https://docs.bullmq.io/bull/important-notes])看看它是否有帮助。在我的情况下,锁定发生在我暂停调试时。
有点晚了,最好在这里写
这是因为这条线constructor(private readonly queue: OrderQueue) {}
更确切地说,这是因为DI机制,可能是Scope.REQUEST
服务的原因(或者它的一个注入服务,这使得主机服务也是Scope.REQUEST服务,整个注入子树都是请求范围的(
@Process()
在单独的进程中运行处理程序,因此无法访问Injector。
如果你看看试图处理job.data时产生的错误,你会看到这样的情况(在我的例子中,试图注入EmailService(:stacktrace ["TypeError: this.request.get is not a functionn at new EmailService (/Users/stephan/projects/platform-api/src/messaging/email/service/email.service.ts:36:50)n at Injector.instantiateClass (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:340:19)n at callback (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:53:45)n at processTicksAndRejections (node:internal/process/task_queues:95:5)n at Injector.loadInstance (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:57:13)n at Injector.loadProvider (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:84:9)n at Injector.resolveComponentHost (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:202:13)n at async Promise.all (index 0)n at Injector.loadCtorMetadata (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:368:23)n at Injector.resolveConstructorParams (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:99:26)","TypeError: this.request.get is not a functionn at new EmailService (/Users/stephan/projects/platform-api/src/messaging/email/service/email.service.ts:36:50)n at Injector.instantiateClass (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:340:19)n at callback (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:53:45)n at processTicksAndRejections (node:internal/process/task_queues:95:5)n at Injector.loadInstance (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:57:13)n at Injector.loadProvider (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:84:9)n at Injector.resolveComponentHost (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:202:13)n at async Promise.all (index 0)n at Injector.loadCtorMetadata (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:368:23)n at Injector.resolveConstructorParams (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:99:26)"]