假设我想编写一个 DAG 来显示 Redshift 特定架构中的所有表。SQL
查询Show Tables;
如何为其创建 DAG? 我认为这应该是这样的:
dag = airflow.DAG(
'process_dimensions',
schedule_interval="@daily",
dagrun_timeout=timedelta(minutes=60),
default_args=args,
max_active_runs=1)
process_product_dim = SQLOperator(
task_id='process_product_dim',
conn_id='??????',
sql='Show Tables',
dag=dag)
有谁知道如何正确编写它?
因为你想要返回该查询的结果,而不仅仅是执行它,所以你需要使用 PostgresHook,特别是get_records
方法。
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.hooks import PostgresHook
def process_product_dim_py(**kwargs):
conn_id = kwargs.get('conn_id')
pg_hook = PostgresHook(conn_id)
sql = "Show Tables;"
records = pg_hook.get_records(sql)
return records
dag = DAG(
'process_dimensions',
schedule_interval="@daily",
dagrun_timeout=timedelta(minutes=60),
default_args=args,
max_active_runs=1)
process_product_dim = PythonOperator(
task_id='process_product_dim',
op_kwargs = {'conn_id':'my_redshift_connection'}
python_callable=process_product_dim_py,
dag=dag)