我正在使用 Azure 数据工厂的复制活动将数据从 blob 中的 csv 文件复制到 CosmosDB(使用 SQL API(。在接收器的链接服务中,如果我不导入任何架构,则执行时的复制活动会从 CSV 读取标头,然后将数据以 json 形式保存在 cosmosDB 中。直到这里它工作正常。
我需要在 cosmosDB 中添加的数据中添加批处理 ID 列(批处理 ID 为 GUID/pipelinerunID(,以便我可以跟踪集中的哪些数据都作为批处理复制。
如何保留所有源列,并在其中添加批处理 ID 列并将其保存在我的 cosmos DB 中。
架构不是固定的,可以在每个 adf 管道触发器上更改,因此无法执行导入架构并在复制活动中执行一对一列映射。
据我所知,将数据从 csv 传输到 cosmos db 时,无法添加自定义列。我建议你在将文档创建到数据库时使用 Azure 函数 Cosmos DB 触发器添加 batchId,作为解决方法。
#r "Microsoft.Azure.Documents.Client"
#r "Newtonsoft.Json"
#r "Microsoft.Azure.DocumentDB.Core"
using System;
using System.Collections.Generic;
using Microsoft.Azure.Documents;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Microsoft.Azure.Documents.Client;a
public static void Run(IReadOnlyList<Document> documents, TraceWriter log)
{
if (documents != null && documents.Count > 0)
{
private static readonly string endpointUrl = "https://***.documents.azure.com:443/";
private static readonly string authorizationKey = "***";
private static readonly string databaseId = "db";
private static readonly string collectionId = "coll";
private static DocumentClient client;
documents[0].SetPropertyValue("batchId","123");
var document = client.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(databaseId, collectionId, documents[0].id), documents[0]).Result.Resource;
log.Verbose("document Id " + documents[0].Id);
}
}
但是,似乎需要自己指定与 Azure 数据工厂中的batchId
不匹配的batchId
。
希望对您有所帮助。