Azure物联网集线器触发按顺序处理数据



我有一个物联网设备(ESP32(,它每x秒向物联网集线器(免费层(发送数据包。每个数据包都包含一个整数属性PacketID,每次发送新数据包时,该属性都会从1开始递增,依此类推。当IoT Hub接收到数据时,Azure IoT集线器触发器(托管在门户网站上(会被调用,数据会被进一步处理-首先插入CosmosDB(从StorageQueue触发器(,然后作为消息发送到SignalR服务(来自另一个StorageQueue触发器(和客户端web应用程序。

物联网中心触发器如下:

[FunctionName("FramesPacketAdd_IoTHubTrigger")]
public async Task Run(
[IoTHubTrigger("messages/events", Connection = "IoTHubConnectionString")] EventData message, ILogger _logger,
[Queue("dbinsert-frames-packet-queue", Connection = "AzureStorageAccountConnectionString")] IAsyncCollector<FramesPacket> dbInsertFramesPacketQueue,
[Queue("eventdata-parse-error-queue", Connection = "AzureStorageAccountConnectionString")] IAsyncCollector<string> eventDataParseErrorQueue)
{        
try
{
_logger.LogInformation($"C# IoT Hub trigger function FramesPacketAdd processed a message: {Encoding.UTF8.GetString(message.Body.Array)}");
string messageBody = Encoding.UTF8.GetString(message.Body.Array);
var jsonObj = JsonConvert.DeserializeObject<FramesPacket>(messageBody);
var framesPacket = new FramesPacket
{
PacketID = jsonObj.PacketID,
SessionID = jsonObj.SessionID,                    
DeviceID = jsonObj.DeviceID,
Frames = jsonObj.Frames
};
await dbInsertFramesPacketQueue.AddAsync(framesPacket);
}
catch (Exception ex)
{
await eventDataParseErrorQueue.AddAsync($"[IoTHubTrigger] function [FramesPacketAdd] caught exception: {ex.Message} nStackTrace: {ex.StackTrace} nJSON: {Encoding.UTF8.GetString(message.Body.Array)}");
_logger.LogError($"[IoTHubTrigger] function [FramesPacketAdd_IoTHubTrigger] caught exception: {ex.Message} nStackTrace: {ex.StackTrace} nJSON: {Encoding.UTF8.GetString(message.Body.Array)}");
}
}

问题是,当数据包发送速度很快时,比如每个数据包之间的间隔为200ms,IoT集线器接收到的数据就太乱了,这意味着有时数据包#7和#8是在数据包#1、#2和#3之前接收和处理的,这对我的客户端应用程序来说是一个问题,因为它依赖于数据包的接收顺序与从ESP32芯片发送的数据包的顺序相同。我尝试以1秒的延迟发送每个数据包,但问题仍然存在,但它似乎只影响前三个数据包——有时它们是按3、1、2的顺序接收的,其余的按正确的顺序接收。更长的延迟似乎完全消除了这个问题。我相信这是因为函数/触发器的异步性质?最后,我希望能够相对快速地发送它们,每个之间有200-500ms的延迟。

我是Azure IoT Hub的新手,我的问题是,是否可以在门户端做一些事情来确保数据包以正确的顺序接收,或者这是这种方法的限制吗?这种情况需要在接收到数据后处理,可能在调用IoT集线器触发器后使用存储队列或服务总线?

我希望我的问题有道理,否则我非常乐意提供更多细节。

提前谢谢。

以下是两种可能的路径:

1.使用EventHub触发的Azure功能来处理来自物联网中心的事件中心兼容端点的消息。

可以保证来自特定设备的每条消息都会发送到同一个分区。

分区中的消息是按顺序排列的,因此您需要的是每个分区的事件处理器。

你可以使用Azure函数来实现这一点,在这种情况下,Azure函数所做的是为每个分区创建一个租约,这使得每个分区只有一个Azure函数实例。这意味着,如果你有10个分区,就会有10个Azure功能实例处理来自每个分区的消息。现在,您要做的是确保在Azure功能中,按顺序处理消息批,例如:

[FunctionName("EventHubTrigger")]
public static async Task RunAsync([EventHubTrigger("ordered", Connection = "EventHub")] EventData[] eventDataSet, TraceWriter log)
{
log.Info($"Triggered batch of size {eventDataSet.Length}");

//processing event by event
foreach (var eventData in eventDataSet) {
try
{
// process the event here
}
catch
{
// handle event exception
}
}
}

这里,在foreach循环之前,除了本机行为之外,您还可以根据sequenceNumbereventDataSet进行排序(如果您正在发送它(,仅针对这种情况。

这是博客。

2.第二种方法非常相似,是关于使用服务总线队列和会话,如以下代码示例所示:

public async Task Run(
[ServiceBusTrigger("queue", Connection = "ServiceBusConnectionString", IsSessionsEnabled = true)]Message message, 
ILogger log)
{
log.LogInformation($"C# ServiceBus queue trigger function processed message: {Encoding.UTF8.GetString(message.Body)}");
// process the message here
}

这也是博客。

需要注意的一件重要事情是,如果任何Azure功能实例(在第一种情况下(失败,租约将被续订,并且可能会处理一些消息两次。如果这是一个问题,那么您应该使用第二种情况

IoT集线器按设备ID对数据进行分区(因此,来自单个设备的所有消息都会按顺序写入同一底层事件集线器分区(。。因此,假设你在ESP32应用程序中只使用一个设备ID,它们应该按照IoT Hub接收它们的顺序。

你是否将消息转储到azure函数日志中,看看它们是否按正确的顺序读取(即,数据库插入可能会让它们乱序(

否则,我不确定azure函数监听器(应该在封面下使用eventprocessorhost(的管道是如何无序地读取的

最新更新