谷歌大查询在加载作业上忽略未知值选项的意外行为(额外列接收错误)



我正在使用Airflow在Google BigQuery中触发加载作业。源文件由多个 NDJSON 文件组成。

这是气流运算符(我认为不相关。

load = GoogleCloudStorageToBigQueryOperator(
task_id=f"load",
bigquery_conn_id="bigquery_default",
pool="bigquery_insert",
destination_project_dataset_table="<HIDDEN>",
bucket="<HIDDEN>",
source_objects=list_files(),
source_format="NEWLINE_DELIMITED_JSON",
write_disposition="WRITE_APPEND",
autodetect=True,
ignore_unknown_values=True
)

为了检查它是否不是Airflow故障,我已经调试并提取了Airflow发送到Google BigQuery REST API的有效负载:

{ 
"configuration":{ 
"load":{ 
"autodetect":True,
"createDisposition":"CREATE_IF_NEEDED",
"destinationTable":{ 
"projectId":"<PRIVATE>",
"datasetId":"<PRIVATE>",
"tableId":"<PRIVATE>"
},
"sourceFormat":"NEWLINE_DELIMITED_JSON",
"sourceUris":[ 
"<PRIVATE>"
],
"writeDisposition":"WRITE_APPEND",
"ignoreUnknownValues":True
}
}
}

由于我正在设置选项ignoreUnknownValues(文档(,我希望将忽略源文件中但不在目标架构中的 JSON 字段,但我从 BigQuery 返回以下错误:

异常:大查询作业失败。最后一个错误是:{'原因': '无效', "消息": "提供的架构与表 [PRIVATE] 不匹配。无法添加 字段(字段:source_fingerprint('}。工作是:{'kind': 'bigquery#job', 'etag': '[PRIVATE]', 'id': '[PRIVATE]', 'selfLink': "[专用]", "user_email": "[专用]", "配置": {"加载": {'sourceUris': [[PRIVATE]], 'destinationTable': {'projectId': '[PRIVATE]', 'datasetId': 'airflow', 'tableId': '[PRIVATE]'}, "创建处置": "CREATE_IF_NEEDED", "写处置": 'WRITE_APPEND', '源格式': 'NEWLINE_DELIMITED_JSON', 'ignoreUnknownValues': True, 'autodetect': True}, 'jobType': 'LOAD'}, 'jobReference': {'projectId': '[PRIVATE]', 'jobId': '[PRIVATE]', 'location': 'EU'}, 'statistics': {'creationTime': '1581675754961', "开始时间": "1581675755090", "结束时间": "1581675755491"}, "状态": {'errorResult': {'reason': 'invalid', 'message': '提供的架构确实如此 不匹配表 [私有]。无法添加字段 (字段: source_fingerprint('}, '错误': [{'原因': '无效', '消息': '提供的架构与表 [私有] 不匹配。无法添加字段 (字段: source_fingerprint('}], '状态': '完成'}}

请注意,我的ignoreUnknownValues选项也会回到响应中,因此他们这边理解。

我希望忽略额外的列并且作业成功完成,根据文档:

忽略未知值:布尔值

[可选] 指示 BigQuery 是否应允许以下额外值: 未在表架构中表示。如果为 true,则额外值为 忽视。如果为 false,则包含额外列的记录将被视为错误 记录,如果错误记录太多,则无效错误为 在作业结果中返回。默认值为 false。这 sourceFormat 属性确定 BigQuery 将哪些内容视为额外的内容 值:CSV:尾随列 JSON:与任何值都不匹配的命名值 列名

有谁知道发生了什么?

请注意,我不想更新我的架构(因此,我没有使用选项schemaUpdateOptions(。我希望忽略多余的列。

谢谢

--

更新 1:我使用的是 Airflow 1.10.3,它已经支持此选项的此语法。旧版本的Airflow有不同的方式传递此参数,但正如我们在我发布的有效载荷上看到的那样,Airflow似乎正在向Google BigQuery API发送正确的选项(相关问题不适用(。

更新2:同样在使用CLI时,我收到相同的错误。

bq load --autodetect --source_format=NEWLINE_DELIMITED_JSON --noreplace --ignore_unknown_values [MY TABLE NAME] [MY GCS PATH]

等待bqjob_[...]_1 ...(0s( 当前状态:完成大查询错误 在加载操作中:处理作业"[...]"时出错:提供的架构执行 不匹配表 [...]。无法添加字段 (字段: metadata_deposit_00_sourceId(

更新3:当我同时使用autodetectignore_unkown_values时,似乎发生了问题。如果我提供现有架构作为schema_fields,那么ignore_unkown_values按预期工作,但这在文档中对我来说不是很清楚。

我找到了一种在使用自动检测时忽略未识别值的方法,而无需传递架构,本质上是通过使用运算符上的源格式特定参数来复制典型的 bq CLI 命令 (src_fmt_configs(:bq load --source_format=NEWLINE_DELIMITED_JSON --autodetect dataset.table table_source

to_bq = GCSToBigQueryOperator(
task_id="to_bq",
bucket=YOUR_BUCKET_NAME,
source_objects=[YOUR_SOURCE_OBJECTS],
destination_project_dataset_table=YOUR_TABLE,
source_format="NEWLINE_DELIMITED_JSON",
src_fmt_configs={"ignoreUnknownValues": True, "autodetect": False},
write_disposition=YOUR_WRITE_DISPOSITION,
)

如果需要指定分区字段,请在运算符参数time_partitioning中执行此操作。

最新更新