大家好,我有一个函数
def get_campaign_active(ds, **kwargs):
logging.info('Checking for inactive campaign types..')
the_db = ds['client']
db = the_db['misc-server']
collection = db.campaigntypes
campaign = list(collection.find({}))
for item in campaign:
if item['active'] == False:
# storing false 'active' campaigns
result = "'{}' active status set to False".format(item['text'])
logging.info("'{}' active status set to False".format(item['text']))
映射到气流任务
get_campaign_active = PythonOperator(
task_id='get_campaign_active',
provide_context=True,
python_callable=get_campaign_active,
xcom_push=True,
op_kwargs={'client': client_production},
dag=dag)
可以看到,我将client_production
变量与任务一起传递给op_kwargs。当任务在气流中运行时,希望这个变量通过函数中的'**kwargs'参数传递进来。
但是为了测试,当我尝试像这样调用函数
get_campaign_active({"client":client_production})
client_production变量在ds
参数中找到。我没有一个用于气流测试的登台服务器,但是有人能告诉我,如果我将这个函数/任务部署到气流,它会从ds
或kwargs
读取client_production变量吗?
现在,如果我试图访问kwargs中的'client'键,kwargs是空的。
感谢你应该这样做:
def get_campaign_active(ds, **kwargs):
logging.info('Checking for inactive campaign types..')
the_db = kwargs['client']
ds
(在设置provide_context=True
时,所有其他宏都传递给kwargs,您可以像以前那样使用命名参数,也可以让ds也传递给kwargs)
因为在你的代码中,你实际上没有使用ds或任何其他宏,你可以改变你的函数签名为get_campaign_active(**kwargs)
和删除provide_context=True
。注意,从Airflow>=2.0
开始,根本不需要provide_context=True
。