我正在使用Airflow在Google BigQuery中触发加载作业。源文件由多个 NDJSON 文件组成。
这是气流运算符(我认为不相关。
load = GoogleCloudStorageToBigQueryOperator(
task_id=f"load",
bigquery_conn_id="bigquery_default",
pool="bigquery_insert",
destination_project_dataset_table="<HIDDEN>",
bucket="<HIDDEN>",
source_objects=list_files(),
source_format="NEWLINE_DELIMITED_JSON",
write_disposition="WRITE_APPEND",
autodetect=True,
ignore_unknown_values=True
)
为了检查它是否不是Airflow故障,我已经调试并提取了Airflow发送到Google BigQuery REST API的有效负载:
{
"configuration":{
"load":{
"autodetect":True,
"createDisposition":"CREATE_IF_NEEDED",
"destinationTable":{
"projectId":"<PRIVATE>",
"datasetId":"<PRIVATE>",
"tableId":"<PRIVATE>"
},
"sourceFormat":"NEWLINE_DELIMITED_JSON",
"sourceUris":[
"<PRIVATE>"
],
"writeDisposition":"WRITE_APPEND",
"ignoreUnknownValues":True
}
}
}
由于我正在设置选项ignoreUnknownValues
(文档(,我希望将忽略源文件中但不在目标架构中的 JSON 字段,但我从 BigQuery 返回以下错误:
异常:大查询作业失败。最后一个错误是:{'原因': '无效', "消息": "提供的架构与表 [PRIVATE] 不匹配。无法添加 字段(字段:source_fingerprint('}。工作是:{'kind': 'bigquery#job', 'etag': '[PRIVATE]', 'id': '[PRIVATE]', 'selfLink': "[专用]", "user_email": "[专用]", "配置": {"加载": {'sourceUris': [[PRIVATE]], 'destinationTable': {'projectId': '[PRIVATE]', 'datasetId': 'airflow', 'tableId': '[PRIVATE]'}, "创建处置": "CREATE_IF_NEEDED", "写处置": 'WRITE_APPEND', '源格式': 'NEWLINE_DELIMITED_JSON', 'ignoreUnknownValues': True, 'autodetect': True}, 'jobType': 'LOAD'}, 'jobReference': {'projectId': '[PRIVATE]', 'jobId': '[PRIVATE]', 'location': 'EU'}, 'statistics': {'creationTime': '1581675754961', "开始时间": "1581675755090", "结束时间": "1581675755491"}, "状态": {'errorResult': {'reason': 'invalid', 'message': '提供的架构确实如此 不匹配表 [私有]。无法添加字段 (字段: source_fingerprint('}, '错误': [{'原因': '无效', '消息': '提供的架构与表 [私有] 不匹配。无法添加字段 (字段: source_fingerprint('}], '状态': '完成'}}
请注意,我的ignoreUnknownValues
选项也会回到响应中,因此他们这边理解。
我希望忽略额外的列并且作业成功完成,根据文档:
忽略未知值:布尔值
[可选] 指示 BigQuery 是否应允许以下额外值: 未在表架构中表示。如果为 true,则额外值为 忽视。如果为 false,则包含额外列的记录将被视为错误 记录,如果错误记录太多,则无效错误为 在作业结果中返回。默认值为 false。这 sourceFormat 属性确定 BigQuery 将哪些内容视为额外的内容 值:CSV:尾随列 JSON:与任何值都不匹配的命名值 列名
有谁知道发生了什么?
请注意,我不想更新我的架构(因此,我没有使用选项schemaUpdateOptions
(。我希望忽略多余的列。
谢谢
--
更新 1:我使用的是 Airflow 1.10.3,它已经支持此选项的此语法。旧版本的Airflow有不同的方式传递此参数,但正如我们在我发布的有效载荷上看到的那样,Airflow似乎正在向Google BigQuery API发送正确的选项(相关问题不适用(。
更新2:同样在使用CLI时,我收到相同的错误。
bq load --autodetect --source_format=NEWLINE_DELIMITED_JSON --noreplace --ignore_unknown_values [MY TABLE NAME] [MY GCS PATH]
等待bqjob_[...]_1 ...(0s( 当前状态:完成大查询错误 在加载操作中:处理作业"[...]"时出错:提供的架构执行 不匹配表 [...]。无法添加字段 (字段: metadata_deposit_00_sourceId(
更新3:当我同时使用autodetect
和ignore_unkown_values
时,似乎发生了问题。如果我提供现有架构作为schema_fields
,那么ignore_unkown_values
按预期工作,但这在文档中对我来说不是很清楚。
我找到了一种在使用自动检测时忽略未识别值的方法,而无需传递架构,本质上是通过使用运算符上的源格式特定参数来复制典型的 bq CLI 命令 (src_fmt_configs(:bq load --source_format=NEWLINE_DELIMITED_JSON --autodetect dataset.table table_source
to_bq = GCSToBigQueryOperator(
task_id="to_bq",
bucket=YOUR_BUCKET_NAME,
source_objects=[YOUR_SOURCE_OBJECTS],
destination_project_dataset_table=YOUR_TABLE,
source_format="NEWLINE_DELIMITED_JSON",
src_fmt_configs={"ignoreUnknownValues": True, "autodetect": False},
write_disposition=YOUR_WRITE_DISPOSITION,
)
如果需要指定分区字段,请在运算符参数time_partitioning
中执行此操作。