Nestjs kafka implementation



我读过nestjs微服务和kafka文档,但有些内容我想不出来。如果你能帮我,我将不胜感激。因此,正如文档所说,我必须在main.ts文件中创建一个microService,如下所示:

const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
}
}
});
await app.listen(() => console.log('app started'));

然后有一个kafkaModule文件,如下所示:

@Module({
imports: [
ClientsModule.register([
{
name: 'HERO_SERVICE',
transport: Transport.KAFKA,
options: {
client: {
clientId: 'hero',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'hero-consumer'
}
}
},
]),
]
})
export class KafkaModule implements OnModuleInit {
constructor(@Inject('HERO_SERVICE') private readonly clientService: KafkaClient)
async onModuleInit() {
await this.clientService.connect();
}
}

首先我想不出createMicroservice的第一个参数有什么用?(我通过了AppModule和KafkaModule,两者都正常工作。知道KafkaModule是在AppModule上导入的(

另一件事是,据我所知,main.ts文件中的微服务部分和配置用于订阅MessagePattern或EventPattern装饰器中使用的主题,而kafkaModule中描述的kafkaClient用于向不同的主题发送消息。

这里的问题是,如果我前面说的是真的,那么为什么clientModule在没有指定为使用者的情况下使用默认的groupId。奇怪的是,我找不到使用clientModule从任何主题获取任何消息的解决方案。我现在正在做的是在每个文件中使用不同的组ID,这样它们就不会有任何冲突。

createMicroservice的第一个参数,它将有助于指导消费者在您想要消费特定主题的消息时如何连接到Kafka。

示例:我们想从主题中获取消息:test01

我们如何申报?

import {Controller} from '@nestjs/common'
import {MessagePattern, Payload} from '@nestjs/microservices'
@Controller('sync')
export class SyncController {
@MessagePattern('test01')
handleTopicTest01(@Payload() message: Sync): any {
// Handle your message here
}
}

第二个块用作生产者,而不是消费者。当应用程序想要向特定主题发送消息时,clientModel将支持此操作。

@Get()
sayHello() {
return this.clientModule.send('say.hello', 'hello world')
}

相关内容

  • 没有找到相关文章

最新更新