我无法收到订阅消息。我做错了什么?
我能够注册到代理的连接。我只是不能在client.updates.listen
中释放任何东西。
还有MqttPublishMessage
和MqttSubscribeMessage
的区别是什么?
import 'dart:async';
import 'dart:io';
import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart';
final client = MqttServerClient.withPort('ws://192.168.43.56', 'cliendId', 8080,);
Future<int> main() async {
/// Set logging on if needed, defaults to off
client.logging(on: false);
client.keepAlivePeriod = 20;
client.port = 8080;
client.useWebSocket = true;
client.onDisconnected = onDisconnected;
client.onConnected = onConnected;
client.onSubscribed = onSubscribed;
client.pongCallback = pong;
final connMess = MqttConnectMessage()
.withClientIdentifier('client_id')
.keepAliveFor(60) // Must agree with the keep alive set above or not set
.startClean() // Non persistent session for testing
.withWillQos(MqttQos.atLeastOnce);
print('EXAMPLE::Mosquitto client connecting....');
client.connectionMessage = connMess;
try {
await client.connect();
} on Exception catch (e) {
print('EXAMPLE::client exception - $e');
client.disconnect();
}
/// Check we are connected
if (client.connectionStatus.state == MqttConnectionState.connected) {
print('EXAMPLE::Mosquitto client connected');
} else {
/// Use status here rather than state if you also want the broker return code.
print(
'EXAMPLE::ERROR Mosquitto client connection failed - disconnecting, status is ${client.connectionStatus}');
client.disconnect();
return -1;
}
/// Ok, lets try a subscription
print('EXAMPLE::Subscribing to the test/lol topic');
const topic = '/busline/201'; // Not a wildcard topic
client.subscribe(topic, MqttQos.exactlyOnce);
client.updates.listen((List<MqttReceivedMessage<MqttMessage>> c) {
final MqttPublishMessage recMess = c[0].payload;
final pt = MqttPublishPayload.bytesToStringAsString(recMess.payload.message);
print(
'EXAMPLE::Change notification:: topic is <${c[0].topic}>, payload is <-- $pt -->');
print('');
});
print('EXAMPLE::Sleeping....');
await MqttUtilities.asyncSleep(120);
}
/// The subscribed callback
void onSubscribed(String topic) {
print('EXAMPLE::Subscription confirmed for topic $topic');
}
/// The unsolicited disconnect callback
void onDisconnected() {
print('EXAMPLE::OnDisconnected client callback - Client disconnection');
if (client.connectionStatus.disconnectionOrigin ==
MqttDisconnectionOrigin.solicited) {
print('EXAMPLE::OnDisconnected callback is solicited, this is correct');
}
}
/// The successful connect callback
void onConnected() {
print(
'EXAMPLE::OnConnected client callback - Client connection was sucessful');
}
/// Pong callback
void pong() {
print('EXAMPLE::Ping response client callback invoked');
}
你可以这样做
// connection succeeded
void onConnected() {
print('Connected');
}
// unconnected
void onDisconnected() {
print('Disconnected');
}
// subscribe to topic succeeded
void onSubscribed(String topic) {
print('Subscribed topic: $topic');
}
// subscribe to topic failed
void onSubscribeFail(String topic) {
print('Failed to subscribe $topic');
}
// unsubscribe succeeded
void onUnsubscribed(String topic) {
print('Unsubscribed topic: $topic');
}
// PING response received
void pong() {
print('Ping response client callback invoked');
}
var data;
Future<MqttServerClient> connect() async {
MqttServerClient client = MqttServerClient.withPort(
'broker', "client something unique", port);
client.logging(on: false);
client.onConnected = onConnected;
client.onDisconnected = onDisconnected;
// client.onUnsubscribed = onUnsubscribed;
client.onSubscribed = onSubscribed;
client.onSubscribeFail = onSubscribeFail;
client.pongCallback = pong;
final connMessage = MqttConnectMessage()
.authenticateAs('Enter Client Id', 'Enter Password')
.withClientIdentifier("something unique match with client")
.startClean()
// .withWillRetain()
.withWillQos(MqttQos.atLeastOnce);
client.connectionMessage = connMessage;
try {
await client.connect();
// client.unsubscribe('topic/');
client.subscribe('topic/', MqttQos.atLeastOnce);
} catch (e) {
print('Exception: $e');
client.disconnect();
}
client.updates!.listen((List<MqttReceivedMessage<MqttMessage>> c) {
final MqttPublishMessage message = c[0].payload as MqttPublishMessage;
final payload =
MqttPublishPayload.bytesToStringAsString(message.payload.message);
data = jsonDecode(payload);
print('Received message:$payload from topic: ${c[0].topic}>');
});
return client;
}