将单个json从azure iot集线器存储到datalake2



我添加了iot集线器和设备。iot-hub的所有数据都以json格式保存到数据湖2。工作正常,但如果同时有来自设备的多条消息,则会将其保存在一个json中。它引起了一些麻烦。。。有没有办法将每个消息事件保存在一个单独的json中?我浏览了iot集线器的设置,但一无所获。

在IoT集线器路由机制中,没有始终将单个消息转发到存储的此类设置。基本上,这个需求可以通过azure函数在流管道使用者(IoTHubTrigger(或事件网格订阅者(EventGridTrigger(中实现。

更新:

以下是IoTHubTrigger函数的示例,其中输出blob绑定到Data Lake Storage Gen2:的容器

run.csx:

#r "Microsoft.Azure.EventHubs"
#r "Newtonsoft.Json"
#r "Microsoft.WindowsAzure.Storage"
using System;
using System.IO;
using System.Text;
using System.Linq;
using Microsoft.Azure.EventHubs;
using Microsoft.WindowsAzure.Storage.Blob;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
public static async Task Run(EventData ed, CloudBlockBlob outputBlob, ILogger log)
{   
//log.LogInformation($"DeviceId = {ed.SystemProperties["iothub-connection-device-id"]}rn{JObject.Parse(Encoding.ASCII.GetString(ed.Body))}");  
var msg = new { 
EnqueuedTimeUtc = ed.SystemProperties["iothub-enqueuedtime"],
Properties = ed.Properties,
SystemProperties = new {
connectionDeviceId = ed.SystemProperties["iothub-connection-device-id"], 
connectionAuthMethod = ed.SystemProperties["iothub-connection-auth-method"],
connectionDeviceGenerationId = ed.SystemProperties["iothub-connection-auth-generation-id"],
enqueuedTime = ed.SystemProperties["iothub-enqueuedtime"]   
},
Body = JObject.Parse(Encoding.ASCII.GetString(ed.Body))
};
byte[] buffer = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(msg));
await outputBlob.UploadFromStreamAsync(new MemoryStream(buffer));
await Task.CompletedTask;
}

function.json:

{
"bindings": [
{
"name": "ed",
"connection": "rk2020iot_IOTHUB",
"eventHubName": "rk2020iot_IOTHUBNAME",
"consumerGroup": "function",
"cardinality": "one",
"direction": "in",
"type": "eventHubTrigger"
},
{
"name": "outputBlob",
"path": "iot/rk2020iot/{DateTime}.json",
"connection": "rk2020datalake2_STORAGE",
"direction": "out",
"type": "blob"
}
]
}

相关内容

  • 没有找到相关文章

最新更新