我是Apache Airflow的新手。我的任务是从谷歌云存储读取数据,转换数据并将转换后的数据上传到BigQuery表中。我可以从云存储桶中获取数据,并将其直接存储到BigQuery表中。我不知道如何在这个管道中包含转换函数。
这是我的代码:
# Import libraries needed for the operation
import airflow
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
# Default Argument
default_args = {
'owner': <OWNER_NAME>,
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=2),
}
# DAG Definition
dag = DAG('load_from_bucket_to_bq',
schedule_interval='0 * * * *',
default_args=default_args)
# Variable Configurations
BQ_CONN_ID = <CONN_ID>
BQ_PROJECT = <PROJECT_ID>
BQ_DATASET = <DATASET_ID>
with dag:
# Tasks
start = DummyOperator(
task_id='start'
)
upload = GoogleCloudStorageToBigQueryOperator(
task_id='load_from_bucket_to_bigquery',
bucket=<BUCKET_NAME>,
source_objects=['*.csv'],
schema_fields=[
{'name': 'Active_Cases', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'Country', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'Last_Update', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'New_Cases', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'New_Deaths', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'Total_Cases', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'Total_Deaths', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'Total_Recovered', 'type': 'STRING', 'mode': 'NULLABLE'},
],
destination_project_dataset_table=BQ_PROJECT + '.' + BQ_DATASET + '.' + <TABLE_NAME>,
write_disposition='WRITE_TRUNCATE',
google_cloud_storage_conn_id=BQ_CONN_ID,
bigquery_conn_id=BQ_CONN_ID,
dag = dag
)
end = DummyOperator(
task_id='end'
)
# Setting Dependencies
start >> upload >> end
如有任何关于如何进行的帮助,我们将不胜感激。谢谢
发布与@sachinb27的对话作为回答。转换可以放在python函数中,并在运行时使用PythonOperator调用转换函数。关于Airflow中可以使用哪些操作员的更多详细信息,请参阅Airflow操作员文档。