如何实时侦听和使用来自Azure IoT Hub的消息



我目前正在尝试使用Azure IoT Hub从事件中心兼容的IoT Hub默认消息/事件端点读取消息。为了尝试这一点,我编写了两个命令行应用程序,一个模拟设备并写入IoT Hub,另一个从IoT Hub消息/事件端点读取。

生产者每秒生成一条消息并将其写入IoT Hub。这似乎工作正常。但是当我启动阅读器/消费者时,它会收到一批消息并关闭应用程序。但与此同时,生产者仍然会产生消息。

我的期望是生产者每秒或随机产生消息,消费者"听"。到端点,如果有新消息到达,读取并显示它。遵循我的代码与生产者和消费者Azure IoT Hub。

生产者/模拟物联网设备

using Microsoft.Azure.Devices.Client;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace IoTGarage_Azure_01_Simulated_Device
{
class Program
{
private static DeviceClient s_deviceClient;
private readonly static string s_myDeviceId = "simulatedDevice";
private readonly static string s_iotHubUri = "<name.azure-devices.net>";
// Im IoT Hub > Geräte > Primärschlüssel
private readonly static string s_deviceKey = "<primary key>";
private static async Task Main()
{
Console.WriteLine("Routing Tutorial: Simulated devicen");
s_deviceClient = DeviceClient.Create(s_iotHubUri,
new DeviceAuthenticationWithRegistrySymmetricKey(s_myDeviceId, s_deviceKey), TransportType.Mqtt);
using var cts = new CancellationTokenSource();
var messages = SendDeviceToCloudMessagesAsync(cts.Token);
Console.WriteLine("Press the Enter key to stop.");
Console.ReadLine();
cts.Cancel();
await messages;
}
private static async Task SendDeviceToCloudMessagesAsync(CancellationToken token)
{
double minTemperature = 20;
double minHumidity = 60;
Random rand = new Random();
while (!token.IsCancellationRequested)
{
double currentTemperature = minTemperature + rand.NextDouble() * 15;
double currentHumidity = minHumidity + rand.NextDouble() * 20;
string infoString;
string levelValue;
if (rand.NextDouble() > 0.7)
{
if (rand.NextDouble() > 0.5)
{
levelValue = "critical";
infoString = "This is a critical message.";
}
else
{
levelValue = "storage";
infoString = "This is a storage message.";
}
}
else
{
levelValue = "normal";
infoString = "This is a normal message.";
}
var telemetryDataPoint = new
{
deviceId = s_myDeviceId,
temperature = currentTemperature,
humidity = currentHumidity,
pointInfo = infoString
};
var telemetryDataString = JsonConvert.SerializeObject(telemetryDataPoint);

// You can encode this as ASCII, but if you want it to be the body of the message, 
//  and to be able to search the body, it must be encoded in UTF with base64 encoding.
using var message = new Message(Encoding.UTF32.GetBytes(telemetryDataString));

//Add one property to the message.
message.Properties.Add("target", levelValue);
// Submit the message to the hub.
await s_deviceClient.SendEventAsync(message);
// Print out the message.
Console.WriteLine("{0} > Sent message: {1}", DateTime.Now, telemetryDataString);
await Task.Delay(1000);
}
}
}
}
消费者>
using System;
using System.Text;
using System.Threading.Tasks;
using Azure.Storage.Blobs;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Processor;
namespace IoTGarage_Azure_02_IoTHub_ReadFromInternalEndpoint
{
class Program
{
private const string ehubNamespaceConnectionString = "<Endpoint=sb://>";
private const string eventHubName = "<iothubname>";
private const string blobStorageConnectionString = "<DefaultEndpointsProtocol=https;AccountName=EndpointSuffix=core.windows.net>";
private const string blobContainerName = "checkpointblob";
static async Task Main()
{
string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName);
processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ProcessErrorHandler;
await processor.StartProcessingAsync();
await Task.Delay(TimeSpan.FromSeconds(30));
await processor.StopProcessingAsync();
}
static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
{
Console.WriteLine("tReceived event: {0}", Encoding.UTF32.GetString(eventArgs.Data.Body.ToArray()));
await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
}
static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
{
Console.WriteLine($"tPartition '{ eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
Console.WriteLine(eventArgs.Exception.Message);
return Task.CompletedTask;
}
}
}

您所看到的可能是由于IoTHub的消息保留策略相对于其内置的EventHub。这就解释了你最初看到的一大堆信息是从你的接收器上传来的。事实上,你的应用程序退出很可能是因为你让主线程退出。设置EventPosition=Latest只读取新消息

选项#1 -使用EventHubConsumerClient

using Azure.Messaging.EventHubs.Consumer;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace DotnetService
{
class Program
{
private const string EventHubsCompatibleEndpoint = "TODO: az iot hub show --query properties.eventHubEndpoints.events.endpoint --name {hubname}";  
private const string EventHubsCompatiblePath = "TODO: {hubname}";  
private const string IotHubSasKey = "TODO: az iot hub policy show --name service --query primaryKey --hub-name {hubname}";  
private const string ConsumerGroup = "$Default";
private static EventHubConsumerClient eventHubConsumerClient = null;
private async static Task Setup()
{
string eventHubConnectionString = $"Endpoint={EventHubsCompatibleEndpoint.Replace("sb://", "amqps://")};EntityPath={EventHubsCompatiblePath};SharedAccessKeyName=service;SharedAccessKey={IotHubSasKey};";
eventHubConsumerClient = new EventHubConsumerClient(ConsumerGroup, eventHubConnectionString);
var tasks = new List<Task>();
var partitions = await eventHubConsumerClient.GetPartitionIdsAsync();
foreach (string partition in partitions)
{
tasks.Add(ReceiveMessagesFromDeviceAsync(partition));
}
}
static async Task ReceiveMessagesFromDeviceAsync(string partitionId)
{
Console.WriteLine($"Starting listener thread for partition: {partitionId}");
while (true)
{
await foreach (PartitionEvent receivedEvent in eventHubConsumerClient.ReadEventsFromPartitionAsync(partitionId, EventPosition.Latest))
{
string msgSource;
string body = Encoding.UTF8.GetString(receivedEvent.Data.Body.ToArray());
if (receivedEvent.Data.SystemProperties.ContainsKey("iothub-message-source"))
{
msgSource = receivedEvent.Data.SystemProperties["iothub-message-source"].ToString();
Console.WriteLine($"{partitionId} {msgSource} {body}");
}
}
}
}
static async Task Main(string[] args)
{
await Setup();
Console.ReadLine();
}
}
}

选项#2使用EventProcessorClient

using System;  
using System.Text;
using System.Threading.Tasks;
using Azure.Storage.Blobs;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Processor;
namespace DotnetService
{
class Program
{
private const string ehubNamespaceConnectionString = "Endpoint=sb://...";
private const string eventHubName = "{iothubname}";
private const string blobStorageConnectionString = "DefaultEndpointsProtocol=...";
private const string blobContainerName = "{storagename}";
private static Task initializeEventHandler(PartitionInitializingEventArgs arg)
{
arg.DefaultStartingPosition = EventPosition.Latest;
return Task.CompletedTask;
}
static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
{
Console.WriteLine("tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()));
await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
}
static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
{
Console.WriteLine($"tPartition '{ eventArgs.PartitionId}': an unhandled exception was encountered.");
Console.WriteLine(eventArgs.Exception.Message);
return Task.CompletedTask;
}
static async Task Main()
{
BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
EventProcessorClient processor = new EventProcessorClient(storageClient, EventHubConsumerClient.DefaultConsumerGroupName, ehubNamespaceConnectionString, eventHubName);
processor.PartitionInitializingAsync += initializeEventHandler;
processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ProcessErrorHandler;
await processor.StartProcessingAsync();
Console.ReadLine();
}
}
}

最新更新