我正在使用Azure Data Factory从MySQL定期导入数据到Azure SQL数据仓库。
数据通过Azure存储帐户上的分期BLOB存储,但是当我运行管道时,它会失败,因为它无法将Blob文本分离回列。管道试图插入目的地的每一行变成一个长字符串,其中包含由"⯑"字符界定的所有列值。
我以前使用了数据工厂,而无需尝试增量机制,并且效果很好。我看不出原因会导致这样的行为,但我可能缺少某些东西。
我正在附上用一些小的命名更改来描述管道的JSON,请告诉我,如果您看到任何可以解释这一点的东西。
谢谢!
编辑:添加异常消息:
失败的执行数据库操作失败。错误消息来自 数据库执行: errorcode = faileddboperation,'type = microsoft.datatransfer.common.shared.hybriddel iveryException,messaGE =错误=错误 将数据加载到SQL数据时发生 仓库。,源= Microsoft.datatransfer.clientLibrary,''typ e = system.data.sqlcliEnt.sqlexception,message = query = query = query 中止 - 达到最大拒绝阈值(0行( 从外部来源读取:总计1行拒绝1行 处理。 (/F4AE80D1-4560-4AF9-9E74-05DE941725AC/DATA.8665812F-FBA1-407A-9E04-9E04-2EE04-2EE5F3CA5A7E.TXT( 列序:27,预期数据类型:VARCHAR(45(整理SQL_LATIN1_GENERAL_CP1_CI_AS,有问题的值:*值行 *(令牌化失败(,错误:此列不够 线。,},],'。
{
"name": "CopyPipeline-move_incremental_test",
"properties": {
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "RelationalSource",
"query": "$$Text.Format('select * from [table] where InsertTime >= \'{0:yyyy-MM-dd HH:mm}\' AND InsertTime < \'{1:yyyy-MM-dd HH:mm}\'', WindowStart, WindowEnd)"
},
"sink": {
"type": "SqlDWSink",
"sqlWriterCleanupScript": "$$Text.Format('delete [schema].[table] where [InsertTime] >= \'{0:yyyy-MM-dd HH:mm}\' AND [InsertTime] <\'{1:yyyy-MM-dd HH:mm}\'', WindowStart, WindowEnd)",
"allowPolyBase": true,
"polyBaseSettings": {
"rejectType": "Value",
"rejectValue": 0,
"useTypeDefault": true
},
"writeBatchSize": 0,
"writeBatchTimeout": "00:00:00"
},
"translator": {
"type": "TabularTranslator",
"columnMappings": "column1:column1,column2:column2,column3:column3"
},
"enableStaging": true,
"stagingSettings": {
"linkedServiceName": "StagingStorage-somename",
"path": "somepath"
}
},
"inputs": [
{
"name": "InputDataset-input"
}
],
"outputs": [
{
"name": "OutputDataset-output"
}
],
"policy": {
"timeout": "1.00:00:00",
"concurrency": 10,
"style": "StartOfInterval",
"retry": 3,
"longRetry": 0,
"longRetryInterval": "00:00:00"
},
"scheduler": {
"frequency": "Hour",
"interval": 1
},
"name": "Activity-0-_Custom query_->[schema]_[table]"
}
],
"start": "2017-06-01T05:29:12.567Z",
"end": "2099-12-30T22:00:00Z",
"isPaused": false,
"hubName": "datafactory_hub",
"pipelineMode": "Scheduled"
}
}
听起来您的工作正确,但是数据形成不佳(常见问题,无UTF-8编码(,因此ADF无法按照您的要求解析结构。当我遇到这一点时,我通常必须在管道中添加自定义活动,以清洁和准备数据,以便通过下游活动以结构化的方式使用它们。不幸的是,这在解决方案的开发中是一个很大的开销,需要您编写C#类来处理数据转换。
还请记住,ADF没有自己的计算,它仅调用其他服务,因此您还需要Azure Batch Service执行以编译代码。
可悲的是,这里没有神奇的修复。Azure非常适合提取和加载您的完美结构化数据,但是在现实世界中,我们需要其他服务来进行转换或清洁,这意味着我们需要一个可以ETL或我更喜欢ECTL的管道。
以下是创建ADF自定义活动的链接,可以使您开始:https://www.purplefrogsystems.com/paul/2016/11/creating-azure-azure-data-factory-custom-custom-actom-activities/
希望这会有所帮助。
我一直在使用同一消息来努力使用同一消息,当使用数据工厂v.2使用分期(这意味着Polybase(,从Azure SQL DB导入到Azure DWH时。我了解到,即使我收到的消息与此处提到的消息非常相似,即使我不是直接从SQL使用Polybase,而是通过Data Factory使用的,我收到的消息将失败。我收到的消息与此处提到的消息非常相似。
无论如何,对我的解决方案是避免用于十进制或数字类型的列的零值,例如isnull(mynumericcol,0(为mynumericcol。