让我先说一下,我是逻辑应用和数据工厂的菜鸟。无论如何,我目前正在进行集成,其中一部分是我需要从逻辑应用触发数据工厂中的管道。我已经成功地做到了这一点,我似乎无法弄清楚的一件事是如何将参数传递给我的管道。我已经尝试在"参数"和"触发器"部分下更改JSON,但到目前为止还没有点击任何东西。管道最终执行,但仅使用默认参数。
有没有人在这方面取得了任何成功?任何帮助,不胜感激。
可以使用逻辑应用的"创建管道运行"操作的 body 属性将参数传递到管道。与往常一样,要小心,因为此操作不仅处于预览状态,而且我也无法在任何MS文档中找到此解决方案。我只是根据其他类似操作的格式进行了有根据的猜测。
例:
"Run_my_pipeline": {
"inputs": {
"host": {
"connection": {
"name": "@parameters('$connections')['azuredatafactory']['connectionId']"
}
},
"method": "post",
"body": {
"param1": "myParamValue",
"param2": "myParamValue"
},
"path": "...",
"queries": {
"x-ms-api-version": "2017-09-01-preview"
},
"authentication": "@parameters('$authentication')"
}
}
正如我在评论中所说,我使用 azure 函数创建了一个解决方法。Azure 函数和逻辑应用可以很好地协同工作。 在此链接中,可以看到如何使用 .net 创建和管理管道 https://learn.microsoft.com/en-us/azure/data-factory/quickstart-create-data-factory-dot-net
如果你已经有 ADF 和管道,你只想运行它(使用管道(,那么你可以只
Dictionary<string, object> parameters = new Dictionary<string, object>
{
{"BoxSerialNumbers", req.BoxSerialNumbers},
{"StartDate", req.StartDate },
{"EndDate",req.EndDate },
{"Recipient", req.Recipient }
};//this is how you add initialaze parameters
var client = Authenticate(); //Authentication with azure
log.Info("Creating.");
CreateRunResponse runResponse = client.Pipelines.CreateRun(resourceGroup, dataFactoryName, "pipeline1", parameters);//run pipeline, you can do this async (it's better)
log.Info("Created.");
var response = new HttpResponseMessage();
if (client.PipelineRuns.Get(resourceGroup, dataFactoryName, runResponse.RunId).Status.Equals("InProgress"))
{
response = new HttpResponseMessage(HttpStatusCode.OK)
{
Content = new StringContent(runResponse.RunId, Encoding.UTF8)
};
}
else
{
response = new HttpResponseMessage(HttpStatusCode.BadRequest)
{
Content = new StringContent("Pipeline didn't started", Encoding.UTF8)//just some validation for function
};
}
return response;
public static DataFactoryManagementClient Authenticate()
{
var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
ClientCredential cc = new ClientCredential(applicationID, authenticationKey);
AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
return new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionID };
}
因此,在请求中,可以从逻辑应用传递参数,使用 runId 可以检查状态。然后在逻辑应用中只需简单的 HTTP 请求即可调用此函数。希望这对某人有所帮助。
我使用了 DraganB 的解决方案,但呼叫签名打开
CreateRunResponse runResponse = client.Pipelines.CreateRun(resourceGroup, dataFactoryName, "pipeline1", parameters);
变了。 小的编辑使这项工作完美:
CreateRunResponse runResponse = client.Pipelines.CreateRun(resourceGroup, dataFactoryName, "pipeline1", parameters: parameters);
这是适合任何需要它的人的功能。
[FunctionName("DatafactoryShim")]
public async static Task<HttpResponseMessage> Run(
[HttpTrigger(AuthorizationLevel.Function, "post")]
HttpRequestMessage req,
ExecutionContext context,
TraceWriter log
)
{
string messageBody = await req.Content.ReadAsStringAsync();
BlobToDatalakeFactoryParameters postValues = JsonHelper.ToClass<BlobToDatalakeFactoryParameters>(messageBody);
Dictionary<string, object> parameters = new Dictionary<string, object>
{
{"blobContainer", postValues.BlobContainer},
{"blobFolder", postValues.BlobFolder },
{"relativeDatalakeFolder", postValues.RelativeDatalakeFolder },
{"modelType", postValues.ModelType }
}; //this is how you add initialaze parameters
var client = Authenticate(); //Authentication with azure
string resourceGroup = ConfigurationManager.AppSettings["resourceGroup"];
string dataFactoryName = ConfigurationManager.AppSettings["dataFactoryName"];
string pipelineName = ConfigurationManager.AppSettings["pipelineName"];
Console.WriteLine("Creating pipeline run...");
CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(
resourceGroup,
dataFactoryName,
pipelineName,
parameters: parameters).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);
var response = new HttpResponseMessage();
if (client.PipelineRuns.Get(ConfigurationManager.AppSettings["resourceGroup"],
ConfigurationManager.AppSettings["dataFactoryName"], runResponse.RunId).Status.Equals("InProgress"))
{
response = new HttpResponseMessage(HttpStatusCode.OK)
{
Content = new StringContent(runResponse.RunId, Encoding.UTF8)
};
}
else
{
response = new HttpResponseMessage(HttpStatusCode.BadRequest)
{
Content =
new StringContent("Pipeline didn't started", Encoding.UTF8) //just some validation for function
};
}
return response;
}