如何自定义导入选定的列从文件在GCS到BigQuery?



我试图自定义导入选定的列从文件在谷歌云存储(GCS)到BigQuery,因为有列的语法问题,导致导入错误到BigQuery。因此,我想从导入BigQuery中排除一些特定的列。

我一直使用GCSToBigQueryOperator从GCS导入文件到BigQuery。我尝试指定schema_fields,但似乎不能这样工作。是否有其他的气流操作符或其他方法将仅选定的列从GCS导入到BigQuery?

对于Airflow,您有几种解决方案。如果只希望跳过多余的字段,可以将"GCSToBigQueryOperator"中的"ignore_unknown_values"参数设置为"True"。对于更复杂的逻辑和应用特定转换的需要,我向您提出两个解决方案:

解决方案1

  • 使用包含原始数据的staging表
  • 截断分段表(选项WRITE_TRUNCATE)
  • GCSToBigQueryOperator
  • 将文件插入到这个staging表
  • 使用BigQueryInsertJobOperator只选择需要的字段,使用SQL应用转换并将它们插入最终表

例子:

import airflow
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
with airflow.DAG(
"dag_id",
default_args={},
schedule_interval=None) as dag:
load_file_to_staging_table = GCSToBigQueryOperator(
task_id='load_file_to_staging_table',
bucket="source_bucket",
source_objects=[f'folder/*.json'],
destination_project_dataset_table='project:dataset.staging_table',
source_format='NEWLINE_DELIMITED_JSON',
compression=None,
skip_leading_rows=1,
create_disposition='CREATE_NEVER',
write_disposition='WRITE_TRUNCATE',
autodetect=True
)
insert_final_table_query = """
SELECT 
field1,
field1
....
FROM `project.dataset.staging_table`
"""
insert_to_final_table = BigQueryInsertJobOperator(
task_id='insert_to_final_table',
configuration={
"query": {
"query": insert_final_table_query,
"useLegacySql": False,
'destinationTable': {
'projectId': "project",
'datasetId': "dataset",
'tableId': "final_table"
},
}
},
location='EU'
)
load_file_to_staging_table >> insert_to_final_table

对于SQL查询,为了简单起见,我自愿在Python代码中提出它们,但是您也可以使用.sql文件+Jinja模板来获得更优雅的解决方案。

解决方案2

  • 使用PythonOperator
  • 在此操作符中,使用Cloud StoragePython客户端检索Dict
  • 列表中的文件数据。
  • Dict列表中的Python上应用所需的转换
  • 使用BigQueryPython客户端将数据保存到BigQuery
import json
from typing import List, Dict
import airflow
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from google.cloud import bigquery

def insert_file_to_bq(dataset, table):
file_data: List[Dict] = read_file_data_from_gcs()

# Apply your transformation in the list of Dict.
client = bigquery.Client()
client.insert_rows_json(f'{dataset}.{table}', file_data)

def read_file_data_from_gcs():
from google.cloud import storage
client = storage.Client()
bucket = client.get_bucket('test_bucket')
blob = bucket.get_blob('temp_files_folder/test.json')
result_bytes = blob.download_as_bytes()
return json.loads(result_bytes.decode('utf-8'))

with airflow.DAG(
"dag_id",
default_args={},
schedule_interval=None) as dag:
start_dag = DummyOperator(task_id='OK', dag=dag)
insert_file_to_bq_table = PythonOperator(
task_id="save_file_bq",
op_kwargs={
'dataset': 'dataset',
'table': 'table'
},
python_callable=insert_file_to_bq
)
start_dag >> insert_file_to_bq_table

最新更新