如何使用Nest Js将消息发送到Kafka中的自定义分区



我试图将消息发送到右侧分区,这是我的NESTJS生产者控制器


constructor(
private readonly appService: AppService,
@Inject('any_name_i_want') private readonly client: ClientKafka,
) {}
async onModuleInit() {
['test2'].forEach((key) => this.client.subscribeToResponseOf(`${key}`));
await this.client.connect();
}
@Get('kafka-test-with-response')
async testKafkaWithResponse() {
const res = await this.client.send('test2', {
foo: 'bar',
data: new Date().toString(),
partition: 1,
});
return res;
}

我试着在NestJs文档中阅读,但它提供了进入Kafka网站的链接,我在Kafka文档中发现了这一点

const producer = kafka.producer()
await producer.connect()
await producer.send({
topic: 'topic-name',
messages: [
{ key: 'key1', value: 'hello world', partition: 0 },
{ key: 'key2', value: 'hey hey!', partition: 1 }
],
})

在我检查了UI之后,Kafka在这里输入图像描述

似乎Producer仍然发送随机分区

我试图深入检查typescript send((函数,但它使用了任何函数,所以我真的不知道如何在发送函数中指定分区

NestJS确实使用了KafkaJS,所以您找到的代码部分是正确的,假设您可以获得一个原始的KafkaJS客户端实例。

或者,Kafka使用记录键对每个事件进行分区,并且在NestJS ProducerConfig中,您可以指定一个partitioner实例。https://github.com/nestjs/nest/blob/master/packages/microservices/external/kafka.interface.ts#L104

最新更新