如何在Azure数据工厂的复制数据活动中排除行



我已经构建了一个带有一个复制数据活动的管道,该管道复制了从Azure Data Lake复制数据并将其输出到Azure Blob Storage

在输出中,我可以看到我的某些行没有数据,我想将它们排除在副本之外。在下面的示例中,第二行没有有用的数据:

{"TenantId":"qa","Timestamp":"2019-03-06T10:53:51.634Z","PrincipalId":2,"ControlId":"729c3b6e-0442-4884-936c-c36c9b466e9d","ZoneInternalId":0,"IsAuthorized":true,"PrincipalName":"John","StreetName":"Rue 1","ExemptionId":8}
{"TenantId":"qa","Timestamp":"2019-03-06T10:59:09.74Z","PrincipalId":null,"ControlId":null,"ZoneInternalId":null,"IsAuthorized":null,"PrincipalName":null,"StreetName":null,"ExemptionId":null}

问题

在复制数据活动中,如何放置一条规则以排除错过某些值的行?

这是我的管道的代码:

{
    "name": "pipeline1",
    "properties": {
        "activities": [
            {
                "name": "Copy from Data Lake to Blob",
                "type": "Copy",
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [
                    {
                        "name": "Source",
                        "value": "tenantdata/events/"
                    },
                    {
                        "name": "Destination",
                        "value": "controls/"
                    }
                ],
                "typeProperties": {
                    "source": {
                        "type": "AzureDataLakeStoreSource",
                        "recursive": true
                    },
                    "sink": {
                        "type": "BlobSink",
                        "copyBehavior": "MergeFiles"
                    },
                    "enableStaging": false,
                    "translator": {
                        "type": "TabularTranslator",
                        "columnMappings": {
                            "Body.TenantId": "TenantId",
                            "Timestamp": "Timestamp",
                            "Body.PrincipalId": "PrincipalId",
                            "Body.ControlId": "ControlId",
                            "Body.ZoneId": "ZoneInternalId",
                            "Body.IsAuthorized": "IsAuthorized",
                            "Body.PrincipalName": "PrincipalName",
                            "Body.StreetName": "StreetName",
                            "Body.Exemption.Kind": "ExemptionId"
                        }
                    }
                },
                "inputs": [
                    {
                        "referenceName": "qadl",
                        "type": "DatasetReference"
                    }
                ],
                "outputs": [
                    {
                        "referenceName": "datalakestaging",
                        "type": "DatasetReference"
                    }
                ]
            }
        ]
    }
}

这是一个非常好的问题( 1),我有同样的问题,我很惊讶我在复制活动中找不到任何东西来处理这一点(我甚至尝试使用容错功能,但没有运气)。

并且鉴于我使用U-SQL进行了其他转换,因此我最终使用它来完成此操作。因此,代替复制活动,我使用iS is not null操作员在ADF中具有u-SQL活动,而是取决于您的数据,但您可以使用它,也许您的字符串包含" null"或"空字符串"或"空字符串","是它的外观:

DECLARE @file_set_path string = "adl://myadl.azuredatalake.net/Samples/Data/{date_utc:yyyy}{date_utc:MM}{date_utc:dd}T{date_utc:HH}{date_utc:mm}{date_utc:ss}Z.txt";
@data =
    EXTRACT 
            [id] string,
            date_utc DateTime
    FROM @file_set_path
    USING Extractors.Text(delimiter: 'u0001', skipFirstNRows : 1, quoting:false);
@result =
    SELECT 
            [id] ,
            date_utc.ToString("yyyy-MM-ddTHH:mm:ss") AS SourceExtractDateUTC
    FROM @data
    WHERE id IS NOT NULL -- you can also use WHERE id <> "" or <> "NULL";
OUTPUT @result TO "wasb://samples@mywasb/Samples/Data/searchlog.tsv" USING Outputters.Text(delimiter: 'u0001', outputHeader:true);

注意:ADL和BLOB存储受支持输入/输出文件

让我知道这是否有帮助,还是上面的示例不适用于您的数据。希望有人会使用复制活动发布答案,这真是太棒了,但这是迄今为止的一种可能性。

相关内容

最新更新