我正在获取一个包含昨天日期的 txt 文件(在今天的日期(,我想在我的数据工厂管道中动态获取此文件名。
该文件自动放置在文件系统上,我想将此文件复制到 blob 存储 在下面的示例中,我通过从一个 blob 复制到另一个 blob 来模拟这一点。
例如:filename_2018-02-11.txt今天(2018-03-12(到达,日期为昨天(2018-02-11(。如何在今天的日期领取此文件?昨天的切片确实运行了,但还没有文件。
这是我的例子:
{"$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Pipeline.json",
"name": "CopyPipeline-fromBlobToBlob",
"properties": {
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "BlobSource",
"recursive": true
},
"sink": {
"type": "BlobSink",
"copyBehavior": "",
"writeBatchSize": 0,
"writeBatchTimeout": "00:00:00"
},
"enableSkipIncompatibleRow": true
},
"inputs": [
{
"name": "InputDataset-1"
}
],
"outputs": [
{
"name": "OutputDataset-1"
}
],
"policy": {
"timeout": "1.00:00:00",
"concurrency": 1,
"executionPriorityOrder": "NewestFirst",
"style": "StartOfInterval",
"retry": 3,
"longRetry": 0,
"longRetryInterval": "00:00:00"
},
"scheduler": {
"frequency": "Day",
"interval": 1,
"offset": "05:00:00"
},
"name": "activity_00"
}
],
"start": "2018-03-07T00:00:00Z",
"end": "2020-03-08T00:00:00Z",
"isPaused": false,
"pipelineMode": "Scheduled"
}
}
您可以在策略中使用EndOfInterval
而不是StartOfInterval
。这将使用一天的结束而不是一天的开始来执行。如果文件在午夜不可用,您可能还需要设置适当的偏移量。
在 ADF v2 中,您可以使用内置变量 ( @pipeline().TriggerTime
(:
https://learn.microsoft.com/en-us/azure/data-factory/control-flow-system-variables
在源数据集(InputDataset-1(中,将文件路径/文件名如下所示:
@concat('YOUR BASE PATH IN BLOB/', 'filename_',
addhours(pipeline().TriggerTime, -1, 'yyyy'), '-',
addhours(pipeline().TriggerTime, -1, 'MM'), '-',
addhours(pipeline().TriggerTime, -1, 'dd'), '.txt'))
您也可以使用@trigger().scheduledTime
始终具有相同的日期,例如管道将失败。但请记住,它仅在触发器范围内可用。在我的测试中,它只针对计划触发器进行了评估。