我试图自定义导入选定的列从文件在谷歌云存储(GCS)到BigQuery,因为有列的语法问题,导致导入错误到BigQuery。因此,我想从导入BigQuery中排除一些特定的列。
我一直使用GCSToBigQueryOperator
从GCS导入文件到BigQuery。我尝试指定schema_fields,但似乎不能这样工作。是否有其他的气流操作符或其他方法将仅选定的列从GCS导入到BigQuery?
对于Airflow
,您有几种解决方案。如果只希望跳过多余的字段,可以将"GCSToBigQueryOperator
"中的"ignore_unknown_values
"参数设置为"True
"。对于更复杂的逻辑和应用特定转换的需要,我向您提出两个解决方案:
解决方案1
- 使用包含原始数据的staging表
- 截断分段表(选项WRITE_TRUNCATE)
- 用
GCSToBigQueryOperator
将文件插入到这个staging表 - 使用
BigQueryInsertJobOperator
只选择需要的字段,使用SQL
应用转换并将它们插入最终表
例子:
import airflow
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
with airflow.DAG(
"dag_id",
default_args={},
schedule_interval=None) as dag:
load_file_to_staging_table = GCSToBigQueryOperator(
task_id='load_file_to_staging_table',
bucket="source_bucket",
source_objects=[f'folder/*.json'],
destination_project_dataset_table='project:dataset.staging_table',
source_format='NEWLINE_DELIMITED_JSON',
compression=None,
skip_leading_rows=1,
create_disposition='CREATE_NEVER',
write_disposition='WRITE_TRUNCATE',
autodetect=True
)
insert_final_table_query = """
SELECT
field1,
field1
....
FROM `project.dataset.staging_table`
"""
insert_to_final_table = BigQueryInsertJobOperator(
task_id='insert_to_final_table',
configuration={
"query": {
"query": insert_final_table_query,
"useLegacySql": False,
'destinationTable': {
'projectId': "project",
'datasetId': "dataset",
'tableId': "final_table"
},
}
},
location='EU'
)
load_file_to_staging_table >> insert_to_final_table
对于SQL
查询,为了简单起见,我自愿在Python
代码中提出它们,但是您也可以使用.sql
文件+Jinja
模板来获得更优雅的解决方案。
解决方案2
- 使用
PythonOperator
- 在此操作符中,使用
Cloud Storage
Python
客户端检索Dict
列表中的文件数据。 - 在
Dict
列表中的Python
上应用所需的转换 - 使用
BigQuery
Python
客户端将数据保存到BigQuery
表
import json
from typing import List, Dict
import airflow
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from google.cloud import bigquery
def insert_file_to_bq(dataset, table):
file_data: List[Dict] = read_file_data_from_gcs()
# Apply your transformation in the list of Dict.
client = bigquery.Client()
client.insert_rows_json(f'{dataset}.{table}', file_data)
def read_file_data_from_gcs():
from google.cloud import storage
client = storage.Client()
bucket = client.get_bucket('test_bucket')
blob = bucket.get_blob('temp_files_folder/test.json')
result_bytes = blob.download_as_bytes()
return json.loads(result_bytes.decode('utf-8'))
with airflow.DAG(
"dag_id",
default_args={},
schedule_interval=None) as dag:
start_dag = DummyOperator(task_id='OK', dag=dag)
insert_file_to_bq_table = PythonOperator(
task_id="save_file_bq",
op_kwargs={
'dataset': 'dataset',
'table': 'table'
},
python_callable=insert_file_to_bq
)
start_dag >> insert_file_to_bq_table