我在Nest.js项目中使用Kafka.js。这就是我初始化KafkaClient:的方式
@Module({
...
providers: [{
provide: 'KAFKA_CLIENT',
useFactory: async (configService: KafkaClientConfigService) => {
const kafkaOptions = configService.getKafkaOptions();
return ClientProxyFactory.create(kafkaOptions);
},
inject: [KafkaClientConfigService],
},
]
...
})
现在,我将KafkaClient注入到我的控制器中,并且我希望以一定的间隔使用消息。虽然有一种方法可以使用consumer.pause()
来使用Kafka.js,但我在KafkaClient中找不到任何对该选项的引用。
有没有暂停或限制消费者的选择?
ClientKafka
具有protected consumer
属性。在使用者本身上,您可以调用pause()
和resume()
。
为了使用这些方法,我目前正在用一个自定义类扩展ClientKafka
,并将其用作提供程序。
import { ClientKafka, KafkaOptions } from '@nestjs/microservices'
export class KafkaClient extends ClientKafka {
pause() {
this.consumer.pause()
// ...
}
}