在azure和f管道中选择查询



如何通过从存储在azure blob中的文件中获取数据来传递azure adf管道中的select查询。

For example: my file(test) in blob是有一个单一的值

在我的adf管道中,在select query中,我们从表中获取值。

Sqlquery:"Select * from emp where id =(需要从test中获取值)"

是可能的吗?

NB此答案指Azure数据工厂(ADF) v1

v2支持通过设置变量活动设置变量。

v1,我认为这在管道中是不可能的,因为Azure数据工厂并不真正支持这种方式的变量。另一种方法是将文件从blob加载到运行其他查询的相同数据库中,创建一个连接两者的存储过程,然后使用存储过程活动调用存储过程,例如:

SELECT * 
FROM emp 
WHERE id = ( SELECT id FROM yourBlobStagingTable );

希望你明白。

最初的答案是ADF的v1版本,在这个版本中,这个功能在当时是不可能的。

从ADF的v2 (GA 2018年6月)开始,现在可以按照下面演示的方式传递参数。因此,下面的管道定义现在将使这成为可能。

您想要的是查找从blob存储中获取值。然后,您可以使用@concat字符串函数将其用作查询中的参数。如何从blob中获得正确的价值取决于您的设计。您可以使用查找或元数据。

下面是一个示例管道。

{
"name": "pipeline1",
"properties": {
    "activities": [
        {
            "name": "get_variable_from_file",
            "type": "Lookup",
            "dependsOn": [],
            "policy": {
                "timeout": "7.00:00:00",
                "retry": 0,
                "retryIntervalInSeconds": 30,
                "secureOutput": false,
                "secureInput": false
            },
            "userProperties": [],
            "typeProperties": {
                "source": {
                    "type": "DelimitedTextSource",
                    "storeSettings": {
                        "type": "AzureBlobFSReadSettings",
                        "recursive": true,
                        "enablePartitionDiscovery": false
                    },
                    "formatSettings": {
                        "type": "DelimitedTextReadSettings",
                        "skipLineCount": 1
                    }
                },
                "dataset": {
                    "referenceName": "blob_storage",
                    "type": "DatasetReference"
                },
                "firstRowOnly": true
            }
        },
        {
            "name": "use_variable_in_query",
            "type": "Copy",
            "dependsOn": [
                {
                    "activity": "get_variable_from_file",
                    "dependencyConditions": [
                        "Succeeded"
                    ]
                }
            ],
            "policy": {
                "timeout": "7.00:00:00",
                "retry": 0,
                "retryIntervalInSeconds": 30,
                "secureOutput": false,
                "secureInput": false
            },
            "userProperties": [],
            "typeProperties": {
                "source": {
                    "type": "AzurePostgreSqlSource",
                    "query": {
                        "value": "@concat('select * from schema.emp where id=',activity('get_variable_from_file').output)",
                        "type": "Expression"
                    }
                },
                "enableStaging": false
            },
            "inputs": [
                {
                    "referenceName": "postgres_database",
                    "type": "DatasetReference",
                    "parameters": {
                        "table_name": "emp"
                    }
                }
            ]
        }
    ],
    "annotations": []
}

}

相关内容

  • 没有找到相关文章

最新更新