如何将表名用作列表,并使用for循环为Airflow DAG中的所有表创建任务



我正试图通过DAG中的for循环访问mysql中的表。我传递了列表中的表名,并从变量中获取值。我的代码工作得很好,但问题是for循环无法获得所有表的结果,只有我在列表中提到的最后一个表中的表得到了结果。

我不知道我在哪里犯了错,有人能帮我渡过难关吗。。

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.providers.amazon.aws.transfers.mysql_to_s3 import MySQLToS3Operator
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.hooks.mysql_hook import MySqlHook
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
import json

dag = DAG(
'Multi_Incremental_Export',
start_date=datetime(2021, 12, 5),
default_args={'mysql_conn_id': 'mysql_connection','provide_context' : True},
catchup=False
)
tab1 = Variable.get('table1')
tab2 = Variable.get('table2')
tab3 = Variable.get('table3')
tables = [tab1,tab2,tab3]
start = DummyOperator(task_id='dummy_task1', retries=2, dag=dag)
def max_source(**kwargs):
ti = kwargs['ti']
request = f"select cast(max(created_at)as char) from {tab};"
mysql_hook = MySqlHook(mysql_conn_id = 'mysql_connection', schema = 'mydata')
connection = mysql_hook.get_conn()
cursor = connection.cursor()
cursor.execute(request)
output = [x for x in cursor.fetchone()]
return output[0]
def max_metatbl(**kwargs):
ti = kwargs['ti']
request = f"select coalesce(max(created_at),'2021-12-05 00:00:00') from {meta_tbl};"
mysql_hook = MySqlHook(mysql_conn_id = 'mysql_connection', schema = 'mydata')
connection = mysql_hook.get_conn()
cursor = connection.cursor()
cursor.execute(request)
output = [x for x in cursor.fetchone()]
return output[0]
def transfer(**kwargs):
ti=kwargs['ti']
meta_val = ti.xcom_pull(task_ids=max_meta)
trns = MySQLToS3Operator(
task_id ='mysql_task',
query =f"select * from Employee where created_at >'{meta_val}';",
s3_bucket = 'mydta',
s3_key = f'{tab}-{val}.csv',
mysql_conn_id = 'mysql_connection',
aws_conn_id  = 'aws_connection',
file_format  = 'csv',
pd_kwargs = {'header' : False}
)
trns.execute(dict())
def mx_update(**kwargs):
ti=kwargs['ti']
meta_max = ti.xcom_pull(task_ids=mx_src)
request = f"insert into {meta_tbl} values('{meta_max}');"
mysql_hook = MySqlHook(mysql_conn_id = 'mysql_connection', schema = 'mydata')
connection = mysql_hook.get_conn()
cursor = connection.cursor()
cursor.execute(request)
connection.commit()
for tab in tables:
meta_tbl = f'{tab}'+'_Metatable'
max_meta = f'{tab}'+'_maxmeta'
meta_updt = f'{tab}'+ '_metaupdate'
mx_src=f'{tab}'+'_maxsrc'
mysql_task = f'{tab}'+'_mysql_task'
val  = datetime.now().strftime("%m-%d-%Y, %H.%M")
t1 = PythonOperator(
task_id=mx_src,
python_callable=max_source,
provide_context=True,
dag=dag
)
t2 = PythonOperator(
task_id=max_meta,
python_callable=max_metatbl,
provide_context=True,
dag=dag
)
t3 = PythonOperator(
task_id= mysql_task,
python_callable= transfer,
provide_context=True,
dag=dag
)
t4 = PythonOperator(
task_id= meta_updt,
python_callable= mx_update,
provide_context=True,
dag=dag
)
start >> t1 >> t2 >> t3 >> t4

我终于得到了我所做的输出更改,即在python运算符中使用op_kwargs将表名作为参数传递,还包括被调用函数的参数。

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.providers.amazon.aws.transfers.mysql_to_s3 import MySQLToS3Operator
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.hooks.mysql_hook import MySqlHook
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
import json

dag = DAG(
'Multi_Incremental_Export',
start_date=datetime(2021, 12, 5),
default_args={'mysql_conn_id': 'mysql_connection','provide_context' : True},
catchup=False
)
start = DummyOperator(task_id='dummy_task1', retries=2, dag=dag)
def max_source(tab,**kwargs):
ti = kwargs['ti']
request = f"select cast(max(created_at)as char) from {tab};"
mysql_hook = MySqlHook(mysql_conn_id = 'mysql_connection', schema = 'mydata')
connection = mysql_hook.get_conn()
cursor = connection.cursor()
cursor.execute(request)
output = [x for x in cursor.fetchone()]
return output[0]
def max_metatbl(tab,**kwargs):
ti = kwargs['ti']
table = tab
meta_tbl = f'{tab}'+'_Metatable'
request = f"select coalesce(max(created_at),'2021-12-05 00:00:00') from {meta_tbl};"
mysql_hook = MySqlHook(mysql_conn_id = 'mysql_connection', schema = 'mydata')
connection = mysql_hook.get_conn()
cursor = connection.cursor()
cursor.execute(request)
output = [x for x in cursor.fetchone()]
return output[0]
def transfer(tab,**kwargs):
max_meta = f'{tab}'+'_maxmeta'
ti=kwargs['ti']
meta_val = ti.xcom_pull(task_ids=max_meta)
val  = datetime.now().strftime("%m-%d-%Y, %H.%M")
trns = MySQLToS3Operator(
task_id ='mysql_task',
query =f"select * from {tab} where created_at >'{meta_val}';",
s3_bucket = 'mydta',
s3_key = f'{tab}-{val}.csv',
mysql_conn_id = 'mysql_connection',
aws_conn_id  = 'aws_connection',
file_format  = 'csv',
pd_kwargs = {'header' : False}
)
trns.execute(dict())
def mx_update(tab,**kwargs):
ti=kwargs['ti']
mx_src=f'{tab}'+'_maxsrc'
meta_max = ti.xcom_pull(task_ids=mx_src)
meta_tbl = f'{tab}'+'_Metatable'
request = f"insert into {meta_tbl} values('{meta_max}');"
mysql_hook = MySqlHook(mysql_conn_id = 'mysql_connection', schema = 'mydata')
connection = mysql_hook.get_conn()
cursor = connection.cursor()
cursor.execute(request)
connection.commit()
tables = ['Employee','Student','Staff','Teachers']
for tab in tables:
max_meta = f'{tab}'+'_maxmeta'
meta_updt = f'{tab}'+ '_metaupdate'
mx_src=f'{tab}'+'_maxsrc'
mysql_task = f'{tab}'+'_mysql_task'
t1 = PythonOperator(
task_id=mx_src,
python_callable=max_source,
op_kwargs={"tab": tab},
provide_context=True,
dag=dag
)
t2 = PythonOperator(
task_id=max_meta,
python_callable=max_metatbl,
op_kwargs={"tab": tab },
provide_context=True,
dag=dag
)
t3 = PythonOperator(
task_id= mysql_task,
python_callable= transfer,
op_kwargs={"tab": tab},
provide_context=True,
dag=dag
)
t4 = PythonOperator(
task_id= meta_updt,
python_callable= mx_update,
op_kwargs={"tab": tab},
provide_context=True,
dag=dag
)
start >> t1 >> t2 >> t3 >> t4

最新更新