什么是 NestJS MQTT 微服务的有效@MessagePattern?



我正在尝试根据文档使用 NestJS 设置 MQTT 微服务。

我已经使用Docker启动了一个工作的Mosquitto代理,并使用各种MQTT客户端验证了它的可操作性。现在,当我启动 NestJS 服务时,它似乎连接正确(mqqt.fx 显示新客户端(,但我无法在我的控制器中接收任何消息。 这是我的引导,就像在文档中一样:

主目录

async function bootstrap() {
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.MQTT,
options: {
host: 'localhost',
port: 1883,
protocol: 'tcp'
}
});
app.listen(() => console.log('Microservice is listening'));
}
bootstrap();

app.controller.ts

@Controller()
export class AppController {
@MessagePattern('mytopic') // tried {cmd:'mytopic'} or {topic:'mytopic'}
root(msg: Buffer) {
console.log('received: ', msg)
}
}

我是否错误地使用了消息模式装饰器,或者我对 NestJS MQTT 微服务应该做什么的概念是错误的?我认为它可能会订阅我传递给装饰器的主题。我唯一的其他信息来源是相应的单元测试

嵌套.js模式处理程序

在 nest.js 侧,我们有以下模式处理程序:

@MessagePattern('sum')
sum(data: number[]): number {
return data.reduce((a, b) => a + b, 0);
}

正如@Alexandre所解释的,这实际上会听sum_ack


非嵌套.js客户端

一个非嵌套.js客户端可能看起来像这样(只需另存为客户端.js,运行npm install mqtt并使用node client.js运行程序(:

var mqtt = require('mqtt')
var client  = mqtt.connect('mqtt://localhost:1883')
client.on('connect', function () {
client.subscribe('sum_res', function (err) {
if (!err) {
client.publish('sum_ack', '{"data": [2, 3]}');
}
})
})
client.on('message', function (topic, message) {
console.log(message.toString())
client.end()
})

它发送有关主题sum_ack的消息,并侦听sum_res上的消息。当它在sum_res上收到一条消息时,它会记录该消息并结束程序。nest.js 期望消息格式{data: myData},然后调用参数处理程序sum(myData)

// Log:
{"err":null,"response":5} // This is the response from sum()
{"isDisposed":true} // Internal "complete event" (according to unit test)

当然,这不是很方便...


巢.js客户端

这是因为这是要与另一个嵌套客户端一起使用.js而不是普通的 mqtt 客户端。嵌套.js客户端将所有内部逻辑抽象出来。请参阅此答案,其中描述了 redis 的客户端(mqtt 只需要更改两行(。

async onModuleInit() {
await this.client.connect();
// no 'sum_ack' or {data: [0, 2, 3]} needed
this.client.send('sum', [0, 2, 3]).toPromise();
}

文档不是很清楚,但似乎对于 mqtt,如果您有@MessagePattern('mytopic'),您可以发布有关主题mytopic_ack的命令,并且您会在mytopic_res上得到响应。我仍在尝试找出如何从服务发布到 mqtt 代理。

请参阅 https://github.com/nestjs/nest/blob/e019afa472c432ffe9e7330dc786539221652412/packages/microservices/server/server-mqtt.ts#L99

public getAckQueueName(pattern: string): string {
return `${pattern}_ack`;
}
public getResQueueName(pattern: string): string {
return `${pattern}_res`;
}

@Tanas是对的。Nestjs/Microservice 现在可以侦听你的 $[topic] 并回答 $[topic]/reply。后缀_ack和_res已弃用。

例如:

@MessagePattern('helloWorld')
getHello(): string {
console.log("hello world")
return this.appService.getHello();
}

现在收听主题: helloWorld 现在在主题上回复helloWorld
/reply

关于身份证件

您还应该在有效负载中提供一个 id(见@Hakier(,Nestjs 将回复一个包含您的 id 的答案。 如果您没有任何 id,仍然不会有任何回复,但相应的逻辑仍会触发。

例如(使用从上面截取的(:
你的味精:

{"data":"foo","id":"bar"}

Nestjs 回复:

{"response":"Hello World!","isDisposed":true,"id":"bar"}

没有身份证件:

您的留言:

{"data":"foo"} or {}

没有回复,但终端中的你好世界

我今天在与 MQTT 作斗争,这对我有所帮助,但我遇到了更多问题,下面您可以看到我的发现:

配置代理 URL 的错误方式

就我使用非本地 MQTT 服务器而言,我从这个开始:

const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.MQTT,
options: {
host: 'test.mosquitto.org',
port: 1883,
protocol: 'tcp',
},
});
await app.listenAsync();

但就像您可以在 ServerMqtt 的构造函数中读取一样,它们仅使用url选项(如果未提供,它会回退到'mqtt://localhost:1883'.虽然我没有本地 MQTT,但它永远不会解析仅在连接上解析的app.listenAsync(),也不会运行任何处理程序。

当我调整代码以使用url选项时,它开始工作。

const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.MQTT,
options: {
url: 'mqtt://test.mosquitto.org:1883',
},
});
await app.listenAsync();

消息需要id属性

第二个非常奇怪的问题是,当我使用来自@KimKern的非嵌套.js客户端脚本时,我必须注册两个消息模式:sumsum_ack

@MessagePattern('sum')
sum(data: number[]): number {
return data.reduce((a, b) => a + b, 0);
}
@MessagePattern('sum_ack')
sumAck(data: number[]): number {
return data.reduce((a, b) => a + b, 0);
}

当我使用console.log时,我发现后者正在运行,但仅在第一个存在时运行。您可以使用 mqtt cli 工具将相同的消息推送到代理进行检查:

mqtt pub -t 'sum_ack' -h 'test.mosquitto.org' -m '{"data":[1,2]}'

但最大的问题是它没有回复(发布sum_res(。

解决方案是在发送消息时也提供id

mqtt pub -t 'sum_ack' -h 'test.mosquitto.org' -m '{"data":[1,2], "id":"any-id"}'

然后我们可以删除"sum_ack"MessagePattern,只留下以下代码:

@MessagePattern('sum')
sum(data: number[]): number {
return data.reduce((a, b) => a + b, 0);
}

这样做的原因隐藏在ServerMqtthandleMessage方法中,如果消息没有id,则不会发布来自处理程序的响应。

TL/DR仅使用url选项指定消息代理的 URL,并始终为消息提供id

我希望这将为其他人节省一些时间。

快乐黑客!

最新更新