正在使用ADV2创建具有计划触发器的管道



我试图将ADFV1中已经存在的管道迁移到ADFV2,但触发器的概念存在一些问题。我的管道有两个活动,第一个是Azure Data Lake Analytics活动,第二个是复制活动。第一个活动运行一个usql脚本,其中数据从分区文件夹/{yyyy}/{MM}/}dd}/中读取、处理并写入文件夹/{yyyy}-{MM}-{dd}/。以下是我工厂的一些JSON文件(管道、触发器和数据集(。

管道:

{
"name": "StreamCompressionBlob2SQL",
"properties": {
"activities": [
{
"name": "compress",
"type": "DataLakeAnalyticsU-SQL",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"scriptPath": "d00044653/azure-configurations/usql-scripts/stream/compression.usql",
"scriptLinkedService": {
"referenceName": "AzureBlobStorage",
"type": "LinkedServiceReference"
},
"parameters": {
"Year": {
"value": "@formatDateTime(pipeline().parameters.windowStartTime,'yyyy')",
"type": "Expression"
},
"Month": {
"value": "@formatDateTime(pipeline().parameters.windowStartTime,'MM')",
"type": "Expression"
},
"Day": {
"value": "@formatDateTime(pipeline().parameters.windowStartTime,'dd')",
"type": "Expression"
}
}
},
"linkedServiceName": {
"referenceName": "AzureDataLakeAnalytics1",
"type": "LinkedServiceReference"
}
},
{
"name": "Blob2SQL",
"type": "Copy",
"dependsOn": [
{
"activity": "compress",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "BlobSource",
"recursive": true
},
"sink": {
"type": "SqlSink",
"writeBatchSize": 10000
},
"enableStaging": false,
"dataIntegrationUnits": 0,
"translator": {
"type": "TabularTranslator",
"columnMappings": {
"tag": "TAG",
"device_id": "DEVICE_ID",
"system_id": "SYSTEM_ID",
"utc": "UTC",
"ts": "TS",
"median": "MEDIAN",
"min": "MIN",
"max": "MAX",
"avg": "AVG",
"stdev": "STDEV",
"first_value": "FIRST_VALUE",
"last_value": "LAST_VALUE",
"message_count": "MESSAGE_COUNT"
}
}
},
"inputs": [
{
"referenceName": "AzureBlobDataset_COMPRESSED_ASA_v1",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "AzureSQLDataset_T_ASSET_MONITORING_WARM_ASA_v1",
"type": "DatasetReference"
}
]
}
],
"parameters": {
"windowStartTime": {
"type": "String"
}
}
}

}

触发器:

{
"name": "trigger1",
"properties": {
"runtimeState": "Started",
"pipelines": [
{
"pipelineReference": {
"referenceName": "StreamCompressionBlob2SQL",
"type": "PipelineReference"
},
"parameters": {
"windowStartTime": "@trigger().scheduledTime"
}
}
],
"type": "ScheduleTrigger",
"typeProperties": {
"recurrence": {
"frequency": "Day",
"interval": 1,
"startTime": "2018-08-17T10:46:00.000Z",
"endTime": "2018-11-04T10:46:00.000Z",
"timeZone": "UTC"
}
}
}

}

复制活动的输入数据集:

{
"name": "AzureBlobDataset_COMPRESSED_ASA_v1",
"properties": {
"linkedServiceName": {
"referenceName": "AzureBlobStorage",
"type": "LinkedServiceReference"
},
"parameters": {
"Year": {
"type": "String",
"defaultValue": "@formatDateTime(pipeline().parameters.windowStartTime,'yyyy')"
},
"Month": {
"type": "String",
"defaultValue": "@formatDateTime(pipeline().parameters.windowStartTime,'yyyy')"
},
"Day": {
"type": "String",
"defaultValue": "@formatDateTime(pipeline().parameters.windowStartTime,'yyyy')"
}
},
"type": "AzureBlob",
"structure": [
{
"name": "tag",
"type": "String"
},
{
"name": "device_id",
"type": "String"
},
{
"name": "system_id",
"type": "String"
},
{
"name": "utc",
"type": "DateTime"
},
{
"name": "ts",
"type": "DateTime"
},
{
"name": "median",
"type": "Double"
},
{
"name": "min",
"type": "Double"
},
{
"name": "max",
"type": "Double"
},
{
"name": "avg",
"type": "Double"
},
{
"name": "stdev",
"type": "Double"
},
{
"name": "first_value",
"type": "Double"
},
{
"name": "last_value",
"type": "Double"
},
{
"name": "message_count",
"type": "Int16"
}
],
"typeProperties": {
"format": {
"type": "TextFormat",
"columnDelimiter": ";",
"nullValue": "\N",
"treatEmptyAsNull": true,
"skipLineCount": 0,
"firstRowAsHeader": true
},
"fileName": "",
"folderPath": {
"value": "@concat('d00044653/processed/stream/compressed',dataset().Year,'-',dataset().Month,'-',dataset().Day)",
"type": "Expression"
}
}
},
"type": "Microsoft.DataFactory/factories/datasets"

}

复制活动的输出数据集:

{
"name": "AzureSQLDataset_T_ASSET_MONITORING_WARM_ASA_v1",
"properties": {
"linkedServiceName": {
"referenceName": "AzureSqlDatabase1",
"type": "LinkedServiceReference"
},
"type": "AzureSqlTable",
"structure": [
{
"name": "TAG",
"type": "String"
},
{
"name": "DEVICE_ID",
"type": "String"
},
{
"name": "SYSTEM_ID",
"type": "String"
},
{
"name": "UTC",
"type": "DateTime"
},
{
"name": "TS",
"type": "DateTime"
},
{
"name": "MEDIAN",
"type": "Decimal"
},
{
"name": "MIN",
"type": "Decimal"
},
{
"name": "MAX",
"type": "Decimal"
},
{
"name": "AVG",
"type": "Decimal"
},
{
"name": "STDEV",
"type": "Decimal"
},
{
"name": "FIRST_VALUE",
"type": "Decimal"
},
{
"name": "LAST_VALUE",
"type": "Decimal"
},
{
"name": "MESSAGE_COUNT",
"type": "Int32"
}
],
"typeProperties": {
"tableName": "[dbo].[T_ASSET_MONITORING_WARM]"
}
},
"type": "Microsoft.DataFactory/factories/datasets"

}

我的问题是出版后什么都没发生。有什么建议吗??

Schedule触发器不支持回填场景(根据您的触发器定义-您从2018年8月17日开始(。在计划触发器中,管道运行只能在当前时间和未来的时间段执行。

在您的情况下,对于回填场景,请使用翻滚窗口触发器。

相关内容

  • 没有找到相关文章

最新更新