如何编写与 Amazon Redshift 连接的 DAG?



假设我想编写一个 DAG 来显示 Redshift 特定架构中的所有表。SQL查询Show Tables;

如何为其创建 DAG? 我认为这应该是这样的:

dag = airflow.DAG(
'process_dimensions',
schedule_interval="@daily",
dagrun_timeout=timedelta(minutes=60),
default_args=args,
max_active_runs=1)
process_product_dim = SQLOperator(
task_id='process_product_dim',
conn_id='??????',
sql='Show Tables',
dag=dag)

有谁知道如何正确编写它?

因为你想要返回该查询的结果,而不仅仅是执行它,所以你需要使用 PostgresHook,特别是get_records方法。

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.hooks import PostgresHook
def process_product_dim_py(**kwargs):
conn_id = kwargs.get('conn_id')
pg_hook = PostgresHook(conn_id)
sql = "Show Tables;"
records = pg_hook.get_records(sql)
return records
dag = DAG(
'process_dimensions',
schedule_interval="@daily",
dagrun_timeout=timedelta(minutes=60),
default_args=args,
max_active_runs=1)
process_product_dim = PythonOperator(
task_id='process_product_dim',
op_kwargs = {'conn_id':'my_redshift_connection'}
python_callable=process_product_dim_py,
dag=dag)

最新更新