如何使用Azure数据工厂从MySQL逐步导入数据到Azure数据仓库



我正在使用Azure Data Factory从MySQL定期导入数据到Azure SQL数据仓库。

数据通过Azure存储帐户上的分期BLOB存储,但是当我运行管道时,它会失败,因为它无法将Blob文本分离回列。管道试图插入目的地的每一行变成一个长字符串,其中包含由"⯑"字符界定的所有列值。

我以前使用了数据工厂,而无需尝试增量机制,并且效果很好。我看不出原因会导致这样的行为,但我可能缺少某些东西。

我正在附上用一些小的命名更改来描述管道的JSON,请告诉我,如果您看到任何可以解释这一点的东西。

谢谢!

编辑:添加异常消息:

失败的执行数据库操作失败。错误消息来自 数据库执行: errorcode = faileddboperation,'type = microsoft.datatransfer.com‌mon.shared.hybriddel‌ iveryException,messa‌GE =错误=错误 将数据加载到SQL数据时发生 仓库。,源= Microsoft.datatransfer.clientLibrary,''typ‌ e = system.data.sqlcli‌Ent.sqlexception,mes‌sage = query = query = query 中止 - 达到最大拒绝阈值(0行( 从外部来源读取:总计1行拒绝1行 处理。 (/F4AE80D1-4560-4AF9-9E74-05DE941725AC/DATA.8665812F-FBA1-40‌7A-9E04-9E04-2EE04-2EE5F3CA5A7E.TXT( 列序:27,预期数据类型:VARCHAR(45(整理SQL_LATIN1_GENERAL_CP1_CI_AS,有问题的值:*值行 *(令牌化失败(,错误:此列不够 线。,},],'。

{
"name": "CopyPipeline-move_incremental_test",
"properties": {
    "activities": [
        {
            "type": "Copy",
            "typeProperties": {
                "source": {
                    "type": "RelationalSource",
                    "query": "$$Text.Format('select * from [table] where InsertTime >= \'{0:yyyy-MM-dd HH:mm}\' AND InsertTime < \'{1:yyyy-MM-dd HH:mm}\'', WindowStart, WindowEnd)"
                },
                "sink": {
                    "type": "SqlDWSink",
                    "sqlWriterCleanupScript": "$$Text.Format('delete [schema].[table] where [InsertTime] >= \'{0:yyyy-MM-dd HH:mm}\' AND [InsertTime] <\'{1:yyyy-MM-dd HH:mm}\'', WindowStart, WindowEnd)",
                    "allowPolyBase": true,
                    "polyBaseSettings": {
                        "rejectType": "Value",
                        "rejectValue": 0,
                        "useTypeDefault": true
                    },
                    "writeBatchSize": 0,
                    "writeBatchTimeout": "00:00:00"
                },
                "translator": {
                    "type": "TabularTranslator",
                    "columnMappings": "column1:column1,column2:column2,column3:column3"
                },
                "enableStaging": true,
                "stagingSettings": {
                    "linkedServiceName": "StagingStorage-somename",
                    "path": "somepath"
                }
            },
            "inputs": [
                {
                    "name": "InputDataset-input"
                }
            ],
            "outputs": [
                {
                    "name": "OutputDataset-output"
                }
            ],
            "policy": {
                "timeout": "1.00:00:00",
                "concurrency": 10,
                "style": "StartOfInterval",
                "retry": 3,
                "longRetry": 0,
                "longRetryInterval": "00:00:00"
            },
            "scheduler": {
                "frequency": "Hour",
                "interval": 1
            },
            "name": "Activity-0-_Custom query_->[schema]_[table]"
        }
    ],
    "start": "2017-06-01T05:29:12.567Z",
    "end": "2099-12-30T22:00:00Z",
    "isPaused": false,
    "hubName": "datafactory_hub",
    "pipelineMode": "Scheduled"
}

}

听起来您的工作正确,但是数据形成不佳(常见问题,无UTF-8编码(,因此ADF无法按照您的要求解析结构。当我遇到这一点时,我通常必须在管道中添加自定义活动,以清洁和准备数据,以便通过下游活动以结构化的方式使用它们。不幸的是,这在解决方案的开发中是一个很大的开销,需要您编写C#类来处理数据转换。

还请记住,ADF没有自己的计算,它仅调用其他服务,因此您还需要Azure Batch Service执行以编译代码。

可悲的是,这里没有神奇的修复。Azure非常适合提取和加载您的完美结构化数据,但是在现实世界中,我们需要其他服务来进行转换或清洁,这意味着我们需要一个可以ETL或我更喜欢ECTL的管道。

以下是创建ADF自定义活动的链接,可以使您开始:https://www.purplefrogsystems.com/paul/2016/11/creating-azure-azure-data-factory-custom-custom-actom-activities/

希望这会有所帮助。

我一直在使用同一消息来努力使用同一消息,当使用数据工厂v.2使用分期(这意味着Polybase(,从Azure SQL DB导入到Azure DWH时。我了解到,即使我收到的消息与此处提到的消息非常相似,即使我不是直接从SQL使用Polybase,而是通过Data Factory使用的,我收到的消息将失败。我收到的消息与此处提到的消息非常相似。

无论如何,对我的解决方案是避免用于十进制或数字类型的列的零值,例如isnull(mynumericcol,0(为mynumericcol。

相关内容

  • 没有找到相关文章

最新更新