流分析 - 如何处理引用输入中的 json



我有一个 Azure 流分析 (ASA( 作业,用于处理来自事件中心的设备遥测数据。流应与 sql 表中的引用数据联接,以使用其他设备元数据增强每条消息。合并的条目应存储在 CosmosDB 中。

用于提供设备元数据的 sql 数据库:

CREATE TABLE [dbo].[MyTable]
(
[DeviceId] NVARCHAR(20) NOT NULL PRIMARY KEY, 
[MetaData] NVARCHAR(MAX) NULL   /* this stores json, which can vary per record */
)

在 ASA 中,我使用一个简单的查询配置了参考数据输入:

SELECT DeviceId, JSON_QUERY(MetaData) FROM [dbo].[MyTable]

我有执行连接的主要 ASA 查询:

WITH temptable AS (
SELECT * FROM [telemetry-input] TD PARTITION BY PartitionId
LEFT OUTER JOIN [metadata-input] MD
ON TD.DeviceId = MD.DeviceId
)
SELECT TD.*, MD.MetaData 
INTO [cosmos-db-output] 
FROM temptable PARTITION BY PartitionId

这一切都有效,合并的数据存储在 CosmosDB 中。但是,sql 中的元数据列的值被视为字符串,并存储在带有引号和转义字符的 comos 中。例:

{ "DeviceId" : "abc1234", … , "MetaData" : "{ "TestKey": "test value" }" };

有没有办法将元数据中的 json 处理和存储为适当的 Json 对象,即

{ "DeviceId" : "abc1234", … , "MetaData" : { "TestKey": "test value" } };

我找到了在 ASA 中实现它的方法 - 您需要创建 JavaScript 用户函数:

function parseJson(strjson){
return JSON.parse(strjson);
}

并在查询中调用它:

...
SELECT TD.*, udf.parseJson(MD.MetaData)
...

正如您在问题中提到的,参考 json 数据被视为 json 字符串,而不是 json 对象。根据我对 ASA 中查询语法的研究,没有内置函数来转换它。

但是,我建议使用 Azure 函数 Cosmos DB 触发器来处理创建的每个文档。请参考我的函数代码:

using System;
using System.Collections.Generic;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Newtonsoft.Json.Linq;
namespace ProcessJson
{
public class Class1
{
[FunctionName("DocumentUpdates")]
public static void Run(
[CosmosDBTrigger(databaseName:"db",collectionName: "item", ConnectionStringSetting = "CosmosDBConnection",LeaseCollectionName = "leases",
CreateLeaseCollectionIfNotExists = true)]
IReadOnlyList<Document> documents,
TraceWriter log)
{
log.Verbose("Start.........");
String endpointUrl = "https://***.documents.azure.com:443/";
String authorizationKey = "***";
String databaseId = "db";
String collectionId = "import";
DocumentClient client = new DocumentClient(new Uri(endpointUrl), authorizationKey);
for (int i = 0; i < documents.Count; i++)
{
Document doc = documents[i];
if((doc.alreadyFormat == Undefined.Value) ||(!doc.alreadyFormat)){
String MetaData = doc.GetPropertyValue<String>("MetaData");
JObject o = JObject.Parse(MetaData);
doc.SetPropertyValue("MetaData", o);
doc.SetPropertyValue("alreadyFormat", true);
client.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(databaseId, collectionId, doc.Id), doc); 
log.Verbose("Update document Id " + doc.Id);
}
}
}
}
}

此外,请参考案例:Azure Cosmos DB SQL - 如何取消转义内部 json 属性

最新更新