Nestjs 基于事件的消息传递,具有 1 个生产者和 2 个使用者



借助 nestjs 微服务,您可以使用基于请求/响应的方法发送消息并接收结果。这是通过@MessagePatternclient.send('my_pattern', myData)的组合来实现的。有关示例,请参阅 nest 文档:https://docs.nestjs.com/microservices/basics#request-response 和 https://docs.nestjs.com/microservices/basics#sending-messages。

如何在基于事件的方法中接收结果?

假设你有一个用户微服务和一个身份验证微服务。每当创建用户时,您都希望创建一个身份验证主题(保存用户名和密码的哈希,以便用户可以使用对身份验证微服务而不是用户服务的 api 请求登录(。

auth/auth.controller.ts

@EventPattern('EVT_USER_CREATED')
public async handleUserCreated(data: any): Promise<AuthSubject> {
if (!data.username || !data.password) {
throw new RpcException('Auth subject must supply username and password');
} 
const newSubject: CreateAuthSubject = {
username: data.username,
password: data.password,
email: data.email ?? '',
};
const sub = await this.subjectService.create(subject);
return sub;
}

user/user.controller.ts

@Post('')
@ApiBody({ type: CreateUser })
@ApiCreatedResponse({ type: User })
public async create(@Body() user: CreateUser): Promise<User> {
const newUser = await this.userService.create(user);
this.userQueue
.emit<any, CreateUser>('EVT_USER_CREATED', user)
.subscribe((res) => {
console.log(res);   // undefined
});
return newUser;
}

为了验证我的设置中没有错误,我将@EventPattern更改为@MessagePatternthis.userQueue.emit<...更改为this.userQueue.send<...。它有效,即 res 是一个有效的身份验证主题,如预期的那样具有用户名和密码。但是,使用此问题中概述的基于事件的方法res始终undefined(无论身份验证控制器是否返回或抛出handleUserCreated(。

最终,我想实现以下目标:如果另一个微服务需要处理"EVT_USER_CREATED"事件,我只需向其控制器添加一个@EventPattern('EVT_USER_CREATED') public async handleUserCreated方法。然后,可观察this.userQueue.emit<any, CreateUser>('EVT_USER_CREATED', user)将收到两个结果:使用用户创建事件的每个微服务一次。

因此,假设我添加了第三个微服务:负责保存付款信息、订单历史记录等的客户微服务。就像它订阅的身份验证服务一样,它订阅了"EVT_USER_CREATED"。

客户/客户.控制器.ts

@EventPattern('EVT_USER_CREATED')
public async handleUserCreated(data: any): Promise<AuthSubject> {    
const customer = await this.customerService.create(data);
return customer ;
}

现在,通过上述设置,微服务身份验证和客户将交替接收事件:如果用户微服务发出用户创建,则只有身份验证服务会对此做出反应并从帽子用户创建身份验证主体。不会为该用户创建任何客户。对于在用户微服务中创建的下一个用户,将仅创建客户,而不会创建身份验证主体。第三个创建的用户将再次由身份验证微服务使用,但不由客户微服务使用。等等。

-- auth microservice 
/
user microservice --- message broker ---

-- customer microservice

总结一下:我如何实现图中所示的消息传递体系结构,以便我只需要user.controller.ts中的一个emit(...)调用,从而在对emit(...)调用的订阅中显示两个响应?

可能有点晚了,但我把我的贡献留给未来的用户。

对于这种强烈基于事件的架构,建议使用诸如Kafka之类的消息传递代理,通过它可以将不同的服务订阅到不同的主题,甚至不同的服务也可以侦听同一主题,这样您就可以以不同的方式对同一事件做出反应。

Kafka 还提供了许多优势,当您想要扩展微服务时,这些优势非常有用,此外,它还在 Nest 中提供支持。 https://docs.nestjs.com/microservices/kafka

您应该记住的是,当使用消息传递系统时,通信通常是完全异步的,这是一个重要的细节,因为按照您提出的情况,您不仅通过 kafka 将消息发送到另一个微服务以验证用户的凭据就足够了,而且有必要向客户端返回响应。对于这种情况,nest 提供的工具是有限的,因为使用 @MessagePattern 装饰器,我们可以在控制器中接收来自 kafka 的消息,但我们不能等待(相同主题或另一个主题(的响应,确认成功响应客户请求。在这种情况下,您可以使用 Kafkajs (https://kafka.js.org/( 创建自己的传输器或自己的 kafka 客户端会很方便,以便在必要时保留用户的请求,直到收到另一个主题的确认。

我在某些论坛中看到的解决方案是将一个对象保存在内存(键/值(中,其中包含与用户 id 关联的请求的"响应"对象,这样您就可以通过 kafka 或任何其他具有特定操作的代理发送消息,并接收另一个控制器的确认, 检索用户的"请求"对象,然后发送响应(请求.发送(...((。这不是理想的解决方案,但在某些情况下有效。

例如:

@Controller('users')
class MyController {
constructor(private kafkaClient: KafkaClient) {}
private users: new Map<string, Request>() // Or private users: any = {}
onModuleInit() {
// By default, NestJs create topic "users-topic.reply"
this.kafkaClient.subscribeToResponseOf('users-topic')
}
@Post('auth')
authUser(@Req req: Request, @Res res: Response): void {
const userData = req.body;
const { id } = userData;
// Save request object in private variable;
this.users.set(id, res); // or this.users[id] = res;
// Send kafka message to validate user data
this.kafkaClient.emit('users-topic')
}
@MessagePattern('users-topic.reply')
authUser(@Payload() data: any): any {
if (data.message === 'success') {
const res:Request = this.users.get(data.id); // or this.users[data.id];

// Clean users object
this.users.remove(data.id); // or this.users[data.id] = null;
// Response client request
res.send('Authentication successfully')
}
}
}

相关内容

最新更新