从Azure DataFactory执行存储过程



我正在尝试从Azure DataFactory V2中执行Azure SQL数据库中的存储过程。该过程将使用来自平桌的数据进行一些UPSERT。根据MS规格,您需要具有一个有价值的参数来制作此类功能,但是将管道活动耦合到过程和所有模型。有什么方法可以定义数据集并复制活动,以便仅执行存储过程?

下面的jsons来自手臂模板:

DataSet:    
{"type": "datasets",
          "name": "AzureSQLProcedureDS",
          "dependsOn": [
            "[parameters('dataFactoryName')]",
            "[parameters('destinationLinkedServiceName')]"
          ],
          "apiVersion": "[variables('apiVersion')]",
          "properties": {
            "type": "AzureSqlTable",
            "linkedServiceName": {
              "referenceName": "[parameters('destinationLinkedServiceName')]",
              "type": "LinkedServiceReference"
            },
            "typeProperties": {
              "tableName": "storedProcedureExecutions"
            }
          }}


    Activity:
    {"name": "ExecuteHarmonizationProcedure",
                    "description": "Executes the procedure that Harmonizes the Data",
                    "type": "Copy",
                    "inputs": [
                      {
                        "referenceName": "[parameters('destinationDataSetName')]",
                        "type": "DatasetReference"
                      }
                    ],
                    "outputs": [
                      {
                        "referenceName": "AzureSQLProcedureDS",
                        "type": "DatasetReference"
                      }
                    ],
                    "typeProperties": {
                      "source": {
                        "type": "SqlSink"
                      },
                      "sink": {
                        "type": "SqlSink",
                        //"SqlWriterTableType": "storedProcedureExecutionsType",
                        "SqlWriterStoredProcedureName": "@Pipeline().parameters.procedureName",
                        "storedProcedureParameters": {
                          "param1": {
                            "value": "call from adf" 
                          }
                        }
                      }
                    }
}

考虑到MS对此主题没有太多帮助。

,任何帮助都将不胜感激。

我不确定我是否正确理解问题,您只想从复制活动中调用存储过程?

这样做很容易,在复制活动中,您可以在源内定义SQLReaderQuery属性。此属性使您可以输入T-SQL命令,因此您可以执行类似的操作:

 "typeProperties": {
        "source": {
            "type": "SqlSource",
            "sqlReaderQuery": "EXEC sp_Name; select 1 as test"
        },
 . . .

复制活动始终期望查询的结果,因此,如果您仅包含对存储过程的调用,那就不是为什么我包括查询的第二部分。

用您要使用的参数替换。

按照@martin的建议,我们设法使执行工作。以下是我们所做的:

  1. 在SQL中创建虚拟表:

    CREATE TABLE [dbo].[dummyTable]( [col1] [NVARCHAR](100) NULL)
    
  2. 创建一个存储过程:

    CREATE PROCEDURE [dbo].[sp_testHarmonize]
         @param1 NVARCHAR(200)
    AS
    BEGIN
        INSERT INTO storedProcedureExecutions
        VALUES (@param1, GETDATE());
    END
    
  3. 存储过程的数据集:

    {
        "type": "datasets",
        "name": "[parameters('dummySQLTableDataSet')]",
        "dependsOn": ["[parameters('dataFactoryName')]",
        "[parameters('datalakeLinkedServiceName')]"],
        "apiVersion": "[variables('apiVersion')]",
        "properties": {
                "type": "AzureSqlTable",
                "linkedServiceName": {
                    "referenceName": "[parameters('databaseLinkedServiceName')]",
                    "type": "LinkedServiceReference"
                },
        "typeProperties": {
                "tableName": "dummyTable"
        }
    }
    
  4. 管道活动:

    {
        "name": "ExecuteHarmonizationProcedure",
        "dependsOn": [{
            "activity": "CopyCSV2SQL",
            "dependencyConditions": ["Succeeded"]
        }],
        "description": "Executes the procedure that Harmonizes the Data",
        "type": "Copy",
        "inputs": [{
            "referenceName": "[parameters('dummySQLTableDataSet')]",
            "type": "DatasetReference"
        }],
        "outputs": [{
            "referenceName": "[parameters('dummySQLTableDataSet')]",
            "type": "DatasetReference"
        }],
        "typeProperties": {
            "source": {
                "type": "SqlSource",
                "sqlReaderQuery": "@Pipeline().parameters.SQLCommand"
            },
            "sink": {
                "type": "SqlSink"
            }
        }
    }
    
  5. 在SQL命令中使用以下参数运行管道:

    $"EXEC sp_testHarmonize 'call from ADF at {DateTime.Now}'; select top 1 * from dummyTable;"
    

这使它起作用了,但是考虑到它是在虚拟桌子上插入的,它看起来比直接解决方案更像是一种工作。如果没有更多直接解决方案,这是最简单的方法。

相关内容

  • 没有找到相关文章

最新更新