气流Dag任务失败全局变量未识别



我有以下内容:

def fetch_sf_data():
response1 = requests.get("https://company.my.salesforce.com/"+ReportID1+"?export=1&enc=UTF-8&xf=csv",
headers = sf.headers, cookies = {'sid' : sid})
global salesforce_report
salesforce_report_raw = pd.read_csv(io.StringIO(response1.text))
salesforce_report = salesforce_report_raw[:-5]
def push_to_sql(salesforce_report):
salesforce_report.to_sql('Daily_Report_SF',engine,if_exists='replace' ,index=False)
t1 = PythonOperator(
task_id='fetch_sf_data',
python_callable=fetch_sf_data,
dag = dag 
)
t2 = PythonOperator(
task_id='push_to_sql',
python_callable=push_to_sql,
dag=dag
)
t1 >> t2

任务1运行完美,但任务2失败并返回以下错误代码:

TypeError: push_to_sql() missing 1 required positional argument: 'salesforce_report'

我的印象是,因为我将"salesforce_report"声明为全局变量,它会毫无问题地传递到下一个任务。现在我用的不是气流吗?为了使任务2不失败,我应该做些什么?

谢谢你的帮助!

这行不通。这是因为在Airflow中,每个任务都可以在不同的机器上工作,因此您应该将结果保存在Xcom或S3/GCS或本地文件系统中。然后在下一个任务中读取此文件并推送到SQL。

最新更新