我正在寻找一种解决方案,以将来自2个不同数据库的SQL DW DMV加载到一个SQL DW表的单个表中。
我参加了ADF管道活动 - 有助于每15分钟加载数据,但是我看到了一个问题 - 当我在一个管道中创建两个活动时,它具有2个不同的源(输入数据集),但两者都将数据加载到同一目标(输出数据集)中。我还想确保 - 我在活动之间建立依赖性,以免它们同时运行。活动2仅在活动1完成/不运行后才开始。
我的ADF代码如下:
{
"name": "Execution_Requests_Hist",
"properties": {
"description": "Execution Requests history data",
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "SqlDWSource",
"sqlReaderQuery": "select * from dm_pdw_exec_requests_hist_view"
},
"sink": {
"type": "SqlDWSink",
"writeBatchSize": 0,
"writeBatchTimeout": "00:00:00"
},
"translator": {
"type": "TabularTranslator",
"columnMappings": "request_id:request_id,session_id:session_id,status:status,submit_time:submit_time,start_time:start_time,end_compile_time:end_compile_time,total_elapsed_time:total_elapsed_time,end_time:end_time,label:label,error_id:error_id,command:command,resource_class:resource_class,database_id:database_id,login_name:login_name,app_name:app_name,client_id:client_id,DMV_Source:DMV_Source,source:source,type:type,create_time:create_time,details:details"
},
"enableSkipIncompatibleRow": true
},
"inputs": [
{
"name": "ID_Exec_Requests"
}
],
"outputs": [
{
"name": "OD_Exec_Requests"
}
],
"policy": {
"timeout": "1.00:00:00",
"concurrency": 1,
"executionPriorityOrder": "NewestFirst",
"style": "StartOfInterval",
"retry": 3,
"longRetry": 0,
"longRetryInterval": "00:00:00"
},
"scheduler": {
"frequency": "Minute",
"interval": 15
},
"name": "PRD_DMV_Load"
},
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "SqlDWSource",
"sqlReaderQuery": "select * from dm_pdw_exec_requests_hist_view"
},
"sink": {
"type": "SqlDWSink",
"writeBatchSize": 0,
"writeBatchTimeout": "00:00:00"
},
"translator": {
"type": "TabularTranslator",
"columnMappings": "request_id:request_id,session_id:session_id,status:status,submit_time:submit_time,start_time:start_time,end_compile_time:end_compile_time,total_elapsed_time:total_elapsed_time,end_time:end_time,label:label,error_id:error_id,command:command,resource_class:resource_class,database_id:database_id,login_name:login_name,app_name:app_name,client_id:client_id,DMV_Source:DMV_Source,source:source,type:type,create_time:create_time,details:details"
},
"enableSkipIncompatibleRow": true
},
"inputs": [
{
"name": "OD_Exec_Requests",
"name": "ITG_Exec_Requests"
}
],
"outputs": [
{
"name": "OD_Exec_Requests"
}
],
"policy": {
"timeout": "1.00:00:00",
"concurrency": 1,
"executionPriorityOrder": "NewestFirst",
"style": "StartOfInterval",
"retry": 3,
"longRetry": 0,
"longRetryInterval": "00:00:00"
},
"scheduler": {
"frequency": "Minute",
"interval": 15
},
"name": "ITG_DMV_Load"
}
],
"start": "2017-08-20T04:22:00Z",
"end": "2018-08-20T04:22:00Z",
"isPaused": false,
"hubName": "xyz-adf_hub",
"pipelineMode": "Scheduled"
}
}
当我尝试部署此功能时 - 给出以下错误消息:
错误活动" prd_dmv_load"和'itg_dmv_load'具有相同的 输出数据集" OD_EXEC_REQUESTS"。两个活动无法输出 在同一活动期间相同的数据集。
如何解决这个问题?我可以说 - 仅在PRD_DMV_LOAD完成后运行ITG_DMV_LOAD?
您在这里有两个问题。
您无法从两个不同的活动/管道中产生相同的数据集切片。要解决这个方法,您可以创建另一个数据集,该数据集将指向同一表上,但是从ADF角度来看,这将是不同的接收器。您还需要将第二个活动移至单独的管道配置中(因此,您最终会有一个活动)。
您需要以某种方式订购管道。我看到了两种可能的方法:您可以尝试使用调度程序配置选项-E.Q。您可以使用offset
属性(或style
)在间隔中间安排一个管道:例如,如果第一个管道是这样配置的:
"scheduler": {
"frequency": "Minute",
"interval": 15
},
这样的第二种配置:
"scheduler": {
"frequency": "Minute",
"interval": 15,
"offset" : 5
},
这种方法可能需要进行一些调整取决于您的管道需要多长时间完成。
另一种方法是将第一管道的输出指定为第二个输入。在这种情况下,第二个活动直到第一个活动才完成。在这种情况下,活动时间表必须匹配(即两者都具有相同的scheduler.frequency
和scheduler.interval
)。
正如@arghtype所说,您不能在两个活动的管道或活动中使用相同的ADF数据集。您需要为ITG_DMV_LOAD创建第二个相同的输出数据集,但您不必拆分管道。您可以通过使第一个输出成为第二个次级输入,直到第一个活动才能确保第二个活动才能运行。我会建议这样的东西...
{
"name": "Execution_Requests_Hist",
"properties": {
"description": "Execution Requests history data",
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "SqlDWSource",
"sqlReaderQuery": "select * from dm_pdw_exec_requests_hist_view"
},
"sink": {
"type": "SqlDWSink",
"writeBatchSize": 0,
"writeBatchTimeout": "00:00:00"
},
"translator": {
"type": "TabularTranslator",
"columnMappings": "request_id:request_id,session_id:session_id,status:status,submit_time:submit_time,start_time:start_time,end_compile_time:end_compile_time,total_elapsed_time:total_elapsed_time,end_time:end_time,label:label,error_id:error_id,command:command,resource_class:resource_class,database_id:database_id,login_name:login_name,app_name:app_name,client_id:client_id,DMV_Source:DMV_Source,source:source,type:type,create_time:create_time,details:details"
},
"enableSkipIncompatibleRow": true
},
"inputs": [
{
"name": "ID_Exec_Requests"
}
],
"outputs": [
{
"name": "OD_Exec_Requests_PRD"
}
],
"policy": {
"timeout": "1.00:00:00",
"concurrency": 1,
"executionPriorityOrder": "NewestFirst",
"style": "StartOfInterval",
"retry": 3,
"longRetry": 0,
"longRetryInterval": "00:00:00"
},
"scheduler": {
"frequency": "Minute",
"interval": 15
},
"name": "PRD_DMV_Load"
},
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "SqlDWSource",
"sqlReaderQuery": "select * from dm_pdw_exec_requests_hist_view"
},
"sink": {
"type": "SqlDWSink",
"writeBatchSize": 0,
"writeBatchTimeout": "00:00:00"
},
"translator": {
"type": "TabularTranslator",
"columnMappings": "request_id:request_id,session_id:session_id,status:status,submit_time:submit_time,start_time:start_time,end_compile_time:end_compile_time,total_elapsed_time:total_elapsed_time,end_time:end_time,label:label,error_id:error_id,command:command,resource_class:resource_class,database_id:database_id,login_name:login_name,app_name:app_name,client_id:client_id,DMV_Source:DMV_Source,source:source,type:type,create_time:create_time,details:details"
},
"enableSkipIncompatibleRow": true
},
"inputs": [
{
"name": "ITG_Exec_Requests",
"name": "OD_Exec_Requests_PRD"
}
],
"outputs": [
{
"name": "OD_Exec_Requests_ITG"
}
],
"policy": {
"timeout": "1.00:00:00",
"concurrency": 1,
"executionPriorityOrder": "NewestFirst",
"style": "StartOfInterval",
"retry": 3,
"longRetry": 0,
"longRetryInterval": "00:00:00"
},
"scheduler": {
"frequency": "Minute",
"interval": 15
},
"name": "ITG_DMV_Load"
}
],
"start": "2017-08-20T04:22:00Z",
"end": "2018-08-20T04:22:00Z",
"isPaused": false,
"hubName": "xyz-adf_hub",
"pipelineMode": "Scheduled"
}