通过此 https://learn.microsoft.com/en-us/azure/data-factory/tutorial-incremental-copy-portal
我有一个名为 GetCurrentWatermarkValue with sqlReaderQuery 的查找活动:
Select WaterMarkValue as CurrentWatermarkValuenfrom WatermarkTable
我还有另一个名为 GetNewWatermarkValue 和 sqlReaderQuery 的活动:
select max(createdon) as NewWatermarkValue from shipment
然后,我尝试在源中使用它们进行数据复制活动
select *
from Shipment
where CreatedOn > '@{activity('GetCurrentWatermarkValue').output.firstRow.CurrentWatermarkValue}'
and CreatedOn <= '@{activity('GetNewWatermarkValue').output.firstRow.NewWatermarkValue}'
预览数据按钮是灰色的(但在我删除 where 条件时启用(,所以
明显有问题设置接收器后,我尝试设置映射。单击"映射"选项卡上的"导入架构"可得到:
A database operation failed with the following error: 'Incorrect syntax near 'GetCurrentWatermarkValue'.'. Activity ID:98794aa9-c866-48d6-b9ff-9cb277bac6ed
我想也许我应该使用添加动态内容选项,但这只会给出
Query is required
我在某处读到,在查找活动中设置了"仅第一行"时,第一行之后的文本。 应该是 [表名],但这似乎不对。
查找:
{
"name": "GetCurrentWatermarkValue",
"type": "Lookup",
"policy": ...,
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderQuery": "Select WaterMarkValue as CurrentWatermarkValuenfrom WatermarkTable"
},
"dataset": {
"referenceName": "WatermarkTable",
"type": "DatasetReference"
}
}
}
查找:
{
"name": "GetNewWatermarkValue",
"type": "Lookup",
"policy": ...,
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderQuery": "select max(createdon) as NewWatermarkValue from shipment"
},
"dataset": {
"referenceName": "ShipmentsTable",
"type": "DatasetReference"
}
}
}
数据复制:
{
"name": "ArchiveShipments",
"type": "Copy",
"dependsOn": [
{
"activity": "GetCurrentWatermarkValue",
"dependencyConditions": [
"Succeeded"
]
},
{
"activity": "GetNewWatermarkValue",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": ...,
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderQuery": {
"value": "select *nfrom Shipmentnwhere CreatedOn > '@{activity('GetCurrentWatermarkValue').output.firstRow.CurrentWatermarkValue}' nand CreatedOn <= '@{activity('GetNewWatermarkValue').output.firstRow.NewWatermarkValue}'",
"type": "Expression"
}
},
"sink": {
"type": "AzureSqlSink"
},
"enableStaging": false
},
"inputs": [
{
"referenceName": "ShipmentsTable",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "ShipmentArchiveTable",
"type": "DatasetReference"
}
]
}
这看起来像是一个语法问题,请尝试这是复制活动查询:
@{CONCAT('select * from Shipment where CreatedOn > ', activity('GetCurrentWatermarkValue').output.firstRow.CurrentWatermarkValue, ' and CreatedOn <= ', activity('GetNewWatermarkValue').output.firstRow.NewWatermarkValue}
除非要手动设置,否则不要使用映射,如果列名相同,则只需单击"清除"。
希望这有帮助!