无法创建两个指向同一目标数据集的数据出厂管道活动



我正在寻找一种解决方案,以将来自2个不同数据库的SQL DW DMV加载到一个SQL DW表的单个表中。

我参加了ADF管道活动 - 有助于每15分钟加载数据,但是我看到了一个问题 - 当我在一个管道中创建两个活动时,它具有2个不同的源(输入数据集),但两者都将数据加载到同一目标(输出数据集)中。我还想确保 - 我在活动之间建立依赖性,以免它们同时运行。活动2仅在活动1完成/不运行后才开始。

我的ADF代码如下:

{
     "name": "Execution_Requests_Hist",
"properties": {
    "description": "Execution Requests history data",
    "activities": [
        {
            "type": "Copy",
            "typeProperties": {
                "source": {
                    "type": "SqlDWSource",
                    "sqlReaderQuery": "select * from dm_pdw_exec_requests_hist_view"
                },
                "sink": {
                    "type": "SqlDWSink",
                    "writeBatchSize": 0,
                    "writeBatchTimeout": "00:00:00"
                },
                "translator": {
                    "type": "TabularTranslator",
                    "columnMappings": "request_id:request_id,session_id:session_id,status:status,submit_time:submit_time,start_time:start_time,end_compile_time:end_compile_time,total_elapsed_time:total_elapsed_time,end_time:end_time,label:label,error_id:error_id,command:command,resource_class:resource_class,database_id:database_id,login_name:login_name,app_name:app_name,client_id:client_id,DMV_Source:DMV_Source,source:source,type:type,create_time:create_time,details:details"
                },
                "enableSkipIncompatibleRow": true
            },
            "inputs": [
                {
                    "name": "ID_Exec_Requests"
                }
            ],
            "outputs": [
                {
                    "name": "OD_Exec_Requests"
                }
            ],
            "policy": {
                "timeout": "1.00:00:00",
                "concurrency": 1,
                "executionPriorityOrder": "NewestFirst",
                "style": "StartOfInterval",
                "retry": 3,
                "longRetry": 0,
                "longRetryInterval": "00:00:00"
            },
            "scheduler": {
                "frequency": "Minute",
                "interval": 15
            },
            "name": "PRD_DMV_Load"
        },
        {
            "type": "Copy",
            "typeProperties": {
                "source": {
                    "type": "SqlDWSource",
                    "sqlReaderQuery": "select * from dm_pdw_exec_requests_hist_view"
                },
                "sink": {
                    "type": "SqlDWSink",
                    "writeBatchSize": 0,
                    "writeBatchTimeout": "00:00:00"
                },
                "translator": {
                    "type": "TabularTranslator",
                    "columnMappings": "request_id:request_id,session_id:session_id,status:status,submit_time:submit_time,start_time:start_time,end_compile_time:end_compile_time,total_elapsed_time:total_elapsed_time,end_time:end_time,label:label,error_id:error_id,command:command,resource_class:resource_class,database_id:database_id,login_name:login_name,app_name:app_name,client_id:client_id,DMV_Source:DMV_Source,source:source,type:type,create_time:create_time,details:details"
                },
                "enableSkipIncompatibleRow": true
            },
            "inputs": [
                {
                    "name": "OD_Exec_Requests",
                    "name": "ITG_Exec_Requests"
                }
            ],
            "outputs": [
                {
                    "name": "OD_Exec_Requests"
                }
            ],
            "policy": {
                "timeout": "1.00:00:00",
                "concurrency": 1,
                "executionPriorityOrder": "NewestFirst",
                "style": "StartOfInterval",
                "retry": 3,
                "longRetry": 0,
                "longRetryInterval": "00:00:00"
            },
            "scheduler": {
                "frequency": "Minute",
                "interval": 15
            },
            "name": "ITG_DMV_Load"
        }           
    ],
    "start": "2017-08-20T04:22:00Z",
    "end": "2018-08-20T04:22:00Z",
    "isPaused": false,
    "hubName": "xyz-adf_hub",
    "pipelineMode": "Scheduled"
}

}

当我尝试部署此功能时 - 给出以下错误消息:

错误活动" prd_dmv_load"和'itg_dmv_load'具有相同的 输出数据集" OD_EXEC_REQUESTS"。两个活动无法输出 在同一活动期间相同的数据集。

如何解决这个问题?我可以说 - 仅在PRD_DMV_LOAD完成后运行ITG_DMV_LOAD?

您在这里有两个问题。

您无法从两个不同的活动/管道中产生相同的数据集切片。要解决这个方法,您可以创建另一个数据集,该数据集将指向同一表上,但是从ADF角度来看,这将是不同的接收器。您还需要将第二个活动移至单独的管道配置中(因此,您最终会有一个活动)。

您需要以某种方式订购管道。我看到了两种可能的方法:您可以尝试使用调度程序配置选项-E.Q。您可以使用offset属性(或style)在间隔中间安排一个管道:例如,如果第一个管道是这样配置的:

 "scheduler": {
     "frequency": "Minute",
     "interval": 15
 },

这样的第二种配置:

"scheduler": {
     "frequency": "Minute",
     "interval": 15,
     "offset" : 5
},

这种方法可能需要进行一些调整取决于您的管道需要多长时间完成。

另一种方法是将第一管道的输出指定为第二个输入。在这种情况下,第二个活动直到第一个活动才完成。在这种情况下,活动时间表必须匹配(即两者都具有相同的scheduler.frequencyscheduler.interval)。

正如@arghtype所说,您不能在两个活动的管道或活动中使用相同的ADF数据集。您需要为ITG_DMV_LOAD创建第二个相同的输出数据集,但您不必拆分管道。您可以通过使第一个输出成为第二个次级输入,直到第一个活动才能确保第二个活动才能运行。我会建议这样的东西...

    {
     "name": "Execution_Requests_Hist",
"properties": {
    "description": "Execution Requests history data",
    "activities": [
        {
            "type": "Copy",
            "typeProperties": {
                "source": {
                    "type": "SqlDWSource",
                    "sqlReaderQuery": "select * from dm_pdw_exec_requests_hist_view"
                },
                "sink": {
                    "type": "SqlDWSink",
                    "writeBatchSize": 0,
                    "writeBatchTimeout": "00:00:00"
                },
                "translator": {
                    "type": "TabularTranslator",
                    "columnMappings": "request_id:request_id,session_id:session_id,status:status,submit_time:submit_time,start_time:start_time,end_compile_time:end_compile_time,total_elapsed_time:total_elapsed_time,end_time:end_time,label:label,error_id:error_id,command:command,resource_class:resource_class,database_id:database_id,login_name:login_name,app_name:app_name,client_id:client_id,DMV_Source:DMV_Source,source:source,type:type,create_time:create_time,details:details"
                },
                "enableSkipIncompatibleRow": true
            },
            "inputs": [
                {
                    "name": "ID_Exec_Requests"
                }
            ],
            "outputs": [
                {
                    "name": "OD_Exec_Requests_PRD"
                }
            ],
            "policy": {
                "timeout": "1.00:00:00",
                "concurrency": 1,
                "executionPriorityOrder": "NewestFirst",
                "style": "StartOfInterval",
                "retry": 3,
                "longRetry": 0,
                "longRetryInterval": "00:00:00"
            },
            "scheduler": {
                "frequency": "Minute",
                "interval": 15
            },
            "name": "PRD_DMV_Load"
        },
        {
            "type": "Copy",
            "typeProperties": {
                "source": {
                    "type": "SqlDWSource",
                    "sqlReaderQuery": "select * from dm_pdw_exec_requests_hist_view"
                },
                "sink": {
                    "type": "SqlDWSink",
                    "writeBatchSize": 0,
                    "writeBatchTimeout": "00:00:00"
                },
                "translator": {
                    "type": "TabularTranslator",
                    "columnMappings": "request_id:request_id,session_id:session_id,status:status,submit_time:submit_time,start_time:start_time,end_compile_time:end_compile_time,total_elapsed_time:total_elapsed_time,end_time:end_time,label:label,error_id:error_id,command:command,resource_class:resource_class,database_id:database_id,login_name:login_name,app_name:app_name,client_id:client_id,DMV_Source:DMV_Source,source:source,type:type,create_time:create_time,details:details"
                },
                "enableSkipIncompatibleRow": true
            },
            "inputs": [
                {
                    "name": "ITG_Exec_Requests",
                    "name": "OD_Exec_Requests_PRD"
                }
            ],
            "outputs": [
                {
                    "name": "OD_Exec_Requests_ITG"
                }
            ],
            "policy": {
                "timeout": "1.00:00:00",
                "concurrency": 1,
                "executionPriorityOrder": "NewestFirst",
                "style": "StartOfInterval",
                "retry": 3,
                "longRetry": 0,
                "longRetryInterval": "00:00:00"
            },
            "scheduler": {
                "frequency": "Minute",
                "interval": 15
            },
            "name": "ITG_DMV_Load"
        }           
    ],
    "start": "2017-08-20T04:22:00Z",
    "end": "2018-08-20T04:22:00Z",
    "isPaused": false,
    "hubName": "xyz-adf_hub",
    "pipelineMode": "Scheduled"
}

相关内容

  • 没有找到相关文章