如何暂停Nest.js Kafka消费者



我在Nest.js项目中使用Kafka.js。这就是我初始化KafkaClient:的方式


@Module({
...
providers: [{
provide: 'KAFKA_CLIENT',
useFactory: async (configService: KafkaClientConfigService) => {
const kafkaOptions = configService.getKafkaOptions();
return ClientProxyFactory.create(kafkaOptions);
},
inject: [KafkaClientConfigService],
},
]
...
})

现在,我将KafkaClient注入到我的控制器中,并且我希望以一定的间隔使用消息。虽然有一种方法可以使用consumer.pause()来使用Kafka.js,但我在KafkaClient中找不到任何对该选项的引用。

有没有暂停或限制消费者的选择?

ClientKafka具有protected consumer属性。在使用者本身上,您可以调用pause()resume()

为了使用这些方法,我目前正在用一个自定义类扩展ClientKafka,并将其用作提供程序。

import { ClientKafka, KafkaOptions } from '@nestjs/microservices'
export class KafkaClient extends ClientKafka {
pause() {
this.consumer.pause()
// ...
}
}

相关内容

  • 没有找到相关文章

最新更新