如何运行云编写器任务,将数据加载到其他项目的BigQuery表中



我在项目a下创建了我的云编写器环境,我想将数据加载到其他项目b BigQuery表中。我知道任务它的GCSToBigQueryOperator,但它没有成功它的失败,我想知道我如何才能实现这一点。从项目A,我想运行一个任务,将数据加载到项目B表。

根据我的经验和对您的情况的假设,我认为您需要确保您的服务帐户(bigquery_conn_idgoogle_cloud_storage_conn_id)在两个项目中都有足够的权限。.

正如shankshera提到的,首先在GCP IAM中检查您在云编写器环境中使用的服务帐户是否可以访问两个项目(以及BigQuery中的数据集)。

老实说,我也不能让这个运算符为我正常工作,所以我编写了自定义python函数来做同样的事情。

from google.cloud import bigquery
def load_into_table_from_csv(**kwargs):

"""
Loads data into specified BQ table from specified CSV file in GCS

Receives parameters from table_path and file_path from PythonOperator in Airflow. 
Parameters need to be explicitly specified in op_kwargs variable in the task definition
Example of op_kwargs for PythonOperator:
{'table_path':'project_id.dataset_id.table_id',
'file_path':'gs://bucket_name/file_name.csv',
'delimiter':',' ,
'quote_character':'"'}
"""
bigquery_client = bigquery.Client()
dataset_ref = kwargs['table_path']
try: 
file=eval(kwargs["file_path"])
except:
file=kwargs["file_path"]
finally:
delimiter=kwargs["delimiter"]
quote_character=kwargs["quote_character"]
job_config = bigquery.LoadJobConfig()
job_config.field_delimiter = delimiter #delimeter in the source file
job_config.skip_leading_rows = 1 #how many rows to skip (set to 1 if you have a header row)
job_config.quote_character=quote_character
job_config.write_disposition ='WRITE_TRUNCATE' #https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad.FIELDS.write_disposition
load_job = bigquery_client.load_table_from_uri(
file,
dataset_ref,
job_config=job_config)
assert load_job.job_type == 'load'
load_job.result()  # Waits for table load to complete.
assert load_job.state == 'DONE'

你可以直接使用这个函数并为它提供如下参数:

t8 = PythonOperator(
task_id=f"load_{table_name}",
python_callable=load_into_table_from_csv, #function that's called by the task
op_kwargs=specs_current_table, #passing arguments into a function
dag=dag
)

作为一个附带说明,我个人同意本文作者https://medium.com/bluecore-engineering/were-all-using-airflow-wrong-and-how-to-fix-it-a56f14cb0753的观点,如果我们可以用普通代码做同样的事情,我们应该小心使用太多自定义操作符

最新更新