我有一个问题,在一个服务中使用多个ClientKafka
,这是我的实现:
@Controller()
export class ApiController implements OnModuleInit {
constructor(
@Inject("ACCOUNT_SERVICE") private readonly accountKafkaClient: ClientKafka,
@Inject("WORKSPACE_SERVICE") private readonly workspaceKafkaClient: ClientKafka
) { }
async onModuleInit() {
const requestPatterns = [
'topic'
];
requestPatterns.forEach((pattern) => {
this.accountKafkaClient.subscribeToResponseOf(`account.${pattern}`);
});
await this.accountKafkaClient.connect();
}
async onModuleDestroy() {
await this.accountKafkaClient.close();
}
@Get()
async sendMessage() {
const data = {
msg: "account.topic"
}
const kafkaResponse = this.accountKafkaClient.send<any>('account.topic', JSON.stringify(data));
const response = await firstValueFrom(kafkaResponse);
const kafkaResponse2 = this.workspaceKafkaClient.send<any>('workspace.topic', JSON.stringify(response )) //THIS IS NOT RUNNING, WORKSPACE_SERVICE NOT RECEIVE ANY MESSAGE
return await firstValueFrom(kafkaResponse2);
}
}
谁能告诉我为什么workspaceKafkaClient
不发送任何消息给WORKSPACE_SERVICE
微服务?我尝试在accountKafkaClient
等onModule...
函数中传递此客户端,但它没有帮助我,
这里也是我的设置模块:
@Module({
imports: [
ClientsModule.register([
{
name: 'ACCOUNT_SERVICE',
transport: Transport.KAFKA,
options: {
client: {
clientId: 'account_service',
brokers: ['localhost:29092'],
},
consumer: {
groupId: 'account-consumer',
},
},
},
{
name: 'WORKSPACE_SERVICE',
transport: Transport.KAFKA,
options: {
client: {
clientId: 'workspace_service',
brokers: ['localhost:29092'],
},
consumer: {
groupId: 'workspace-consumer',
},
},
},
]),
],
controllers: [ApiController],
providers: [
ApiService,
// KafkaProducerProvider,
],
})
export class ApiModule {}
谢谢你的帮助!
每个应用只需要一个生产者客户端,但是Kafka生产者从不立即向代理发送数据。
你需要冲洗它们,这是await firstValueFrom(...)
应该做的,但是你没有展示那个方法。
否则,您似乎试图从一个主题获得响应以发送到另一个主题,这是消费者应该用于的,而不是阻塞一个生产者请求。