在Airflow中全局声明熊猫数据帧



要求:

我正在使用以下3个数据帧:

df1-在数据处理之前,表中没有记录的查询结果

df2-数据处理后表中没有记录的查询结果

df=合并df1和df2

代码:

from datetime import datetime,timedelta
from airflow import DAG
from airflow.contrib.operators.snowflake_operator import SnowflakeOperator
from airflow.operators.email_operator import EmailOperator
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.hooks.snowflake_hook import SnowflakeHook
import pendulum
import pandas as pd
import logging
local_tz = pendulum.timezone("my_time_zone")
local_time = pendulum.now("my_time_zone")
date = local_time.to_date_string()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
###setting default arguments
default_args = {
'owner': 'owner',
'start_date': datetime(2021, 1, 1, tzinfo=local_tz),
'email': ['xxx@example.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5)
}
sf_hook=SnowflakeHook(snowflake_conn_id="my_conn_id")
sf_con=sf_hook.get_conn()
df1 = pd.DataFrame()  ##empty dataframe
def get_before_load_count(**context):
global df1
query1 = """SELECT 'att1' AS ATTRIBUTE,
COUNT(*) AS RECORD_COUNT_BEFORE_LOAD FROM DB.SCHEMA.TABLE1
UNION ALL
SELECT 'att2' AS ATTRIBUTE,
COUNT(*) AS RECORD_COUNT_BEFORE_LOAD FROM DB.SCHEMA.TABLE2
UNION ALL
SELECT 'att3' AS ATTRIBUTE,
COUNT(*) AS RECORD_COUNT_BEFORE_LOAD FROM DB.SCHEMA.TABLE3
UNION ALL
SELECT 'att4' AS ATTRIBUTE,
COUNT(*) AS RECORD_COUNT_BEFORE_LOAD FROM DB.SCHEMA.TABLE4
UNION ALL
SELECT 'att5' AS ATTRIBUTE,
COUNT(*) AS RECORD_COUNT_BEFORE_LOAD FROM DB.SCHEMA.TABLE5 """
df1= df1.append(pd.read_sql(query1,sf_con),ignore_index=True)
print("df1",df1)  ## I am able to see the record counts
def send_success_notification(**context):
global df1
query = """SELECT 'att1' AS ATTRIBUTE,
COUNT(*) AS RECORD_COUNT_AFTER_LOAD FROM DB.SCHEMA.TABLE1
UNION ALL
SELECT 'att2' AS ATTRIBUTE,
COUNT(*) AS RECORD_COUNT_AFTER_LOAD FROM DB.SCHEMA.TABLE2
UNION ALL
SELECT 'att3' AS ATTRIBUTE,
COUNT(*) AS RECORD_COUNT_AFTER_LOAD FROM DB.SCHEMA.TABLE3
UNION ALL
SELECT 'att4' AS ATTRIBUTE,
COUNT(*) AS RECORD_COUNT_AFTER_LOAD FROM DB.SCHEMA.TABLE4
UNION ALL
SELECT 'att5' AS ATTRIBUTE,
COUNT(*) AS RECORD_COUNT_AFTER_LOAD FROM DB.SCHEMA.TABLE5"""
df2 = pd.read_sql(query,sf_con)
print("df2",df2)
print("my df1",df1)
df3 = pd.merge(df1 , df2 , on = "ATTRIBUTE")
df=df3.reindex(["ATTRIBUTE","RECORD_COUNT_BEFORE_LOAD","RECORD_COUNT_AFTER_LOAD"], axis=1)
print("df",df)
html_table = df.to_html(index=False,justify='center')
op = EmailOperator(task_id='success_email',to='xxx@example.com',
subject='Email subject '+ date,
html_content=" <p>Hi,<br><br>Process Completed<br><br> {}".format(html_table),dag=dag)
op.execute(context)
with DAG('sample_dag', schedule_interval=None, max_active_runs=1, catchup=False,default_args=default_args) as dag:
before_load = PythonOperator(
task_id="before_load",
python_callable=get_before_load_count,
provide_context=True,
dag=dag
)
load_data = SnowflakeOperator(
task_id='load_sample_data',
sql=['CALL procedure_name()'],
snowflake_conn_id='sf_con_id',
database='DB',
schema='SCHEMA',
warehouse = 'WAREHOUSE',
role = 'ROLE',
dag=dag
)
send_success_email = PythonOperator(
task_id="send_success_email",
python_callable=send_success_notification,
provide_context=True,
dag=dag
)
before_load >> load_data >> send_success_email

问题:当我合并df1和df2时,代码抛出一个Key错误,指出df1为空。

感谢您的帮助!

气流中运行的每个任务都在不同的进程中运行(可能在不同的机器上),因此无法使用变量在任务之间共享数据。

对于少量元数据,您可以使用内置XComhttps://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html

对于较大的数据帧,您必须手动将数据存储在外部存储器上(例如,将其导出到GCS/S3),或者您可以使用自定义xcom后端(也可以使用一些外部后备存储器)。

我建议将数据刷新到气流工作人员可以通过网络访问的磁盘(可能是csv文件),并让您的任务将其加载到数据帧中。这将帮助你轻松实现你的目标。

最新更新