我正在尝试从Azure DataFactory V2中执行Azure SQL数据库中的存储过程。该过程将使用来自平桌的数据进行一些UPSERT。根据MS规格,您需要具有一个有价值的参数来制作此类功能,但是将管道活动耦合到过程和所有模型。有什么方法可以定义数据集并复制活动,以便仅执行存储过程?
下面的jsons来自手臂模板:
DataSet:
{"type": "datasets",
"name": "AzureSQLProcedureDS",
"dependsOn": [
"[parameters('dataFactoryName')]",
"[parameters('destinationLinkedServiceName')]"
],
"apiVersion": "[variables('apiVersion')]",
"properties": {
"type": "AzureSqlTable",
"linkedServiceName": {
"referenceName": "[parameters('destinationLinkedServiceName')]",
"type": "LinkedServiceReference"
},
"typeProperties": {
"tableName": "storedProcedureExecutions"
}
}}
Activity:
{"name": "ExecuteHarmonizationProcedure",
"description": "Executes the procedure that Harmonizes the Data",
"type": "Copy",
"inputs": [
{
"referenceName": "[parameters('destinationDataSetName')]",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "AzureSQLProcedureDS",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "SqlSink"
},
"sink": {
"type": "SqlSink",
//"SqlWriterTableType": "storedProcedureExecutionsType",
"SqlWriterStoredProcedureName": "@Pipeline().parameters.procedureName",
"storedProcedureParameters": {
"param1": {
"value": "call from adf"
}
}
}
}
}
考虑到MS对此主题没有太多帮助。
我不确定我是否正确理解问题,您只想从复制活动中调用存储过程?
这样做很容易,在复制活动中,您可以在源内定义SQLReaderQuery属性。此属性使您可以输入T-SQL命令,因此您可以执行类似的操作:
"typeProperties": {
"source": {
"type": "SqlSource",
"sqlReaderQuery": "EXEC sp_Name; select 1 as test"
},
. . .
复制活动始终期望查询的结果,因此,如果您仅包含对存储过程的调用,那就不是为什么我包括查询的第二部分。
用您要使用的参数替换。
按照@martin的建议,我们设法使执行工作。以下是我们所做的:
在SQL中创建虚拟表:
CREATE TABLE [dbo].[dummyTable]( [col1] [NVARCHAR](100) NULL)
创建一个存储过程:
CREATE PROCEDURE [dbo].[sp_testHarmonize] @param1 NVARCHAR(200) AS BEGIN INSERT INTO storedProcedureExecutions VALUES (@param1, GETDATE()); END
存储过程的数据集:
{ "type": "datasets", "name": "[parameters('dummySQLTableDataSet')]", "dependsOn": ["[parameters('dataFactoryName')]", "[parameters('datalakeLinkedServiceName')]"], "apiVersion": "[variables('apiVersion')]", "properties": { "type": "AzureSqlTable", "linkedServiceName": { "referenceName": "[parameters('databaseLinkedServiceName')]", "type": "LinkedServiceReference" }, "typeProperties": { "tableName": "dummyTable" } }
管道活动:
{ "name": "ExecuteHarmonizationProcedure", "dependsOn": [{ "activity": "CopyCSV2SQL", "dependencyConditions": ["Succeeded"] }], "description": "Executes the procedure that Harmonizes the Data", "type": "Copy", "inputs": [{ "referenceName": "[parameters('dummySQLTableDataSet')]", "type": "DatasetReference" }], "outputs": [{ "referenceName": "[parameters('dummySQLTableDataSet')]", "type": "DatasetReference" }], "typeProperties": { "source": { "type": "SqlSource", "sqlReaderQuery": "@Pipeline().parameters.SQLCommand" }, "sink": { "type": "SqlSink" } } }
在SQL命令中使用以下参数运行管道:
$"EXEC sp_testHarmonize 'call from ADF at {DateTime.Now}'; select top 1 * from dummyTable;"
这使它起作用了,但是考虑到它是在虚拟桌子上插入的,它看起来比直接解决方案更像是一种工作。如果没有更多直接解决方案,这是最简单的方法。