将从功能参数中的ds或**kwargs读取气流



大家好,我有一个函数

def get_campaign_active(ds, **kwargs):
logging.info('Checking for inactive campaign types..')
the_db = ds['client']
db = the_db['misc-server']
collection = db.campaigntypes
campaign = list(collection.find({})) 
for item in campaign:
if item['active'] == False:
# storing false 'active' campaigns
result = "'{}' active status set to False".format(item['text'])
logging.info("'{}' active status set to False".format(item['text']))

映射到气流任务

get_campaign_active = PythonOperator(
task_id='get_campaign_active',
provide_context=True,
python_callable=get_campaign_active,
xcom_push=True,
op_kwargs={'client': client_production},
dag=dag)

可以看到,我将client_production变量与任务一起传递给op_kwargs。当任务在气流中运行时,希望这个变量通过函数中的'**kwargs'参数传递进来。

但是为了测试,当我尝试像这样调用函数

get_campaign_active({"client":client_production})

client_production变量在ds参数中找到。我没有一个用于气流测试的登台服务器,但是有人能告诉我,如果我将这个函数/任务部署到气流,它会从dskwargs读取client_production变量吗?

现在,如果我试图访问kwargs中的'client'键,kwargs是空的。

感谢

你应该这样做:

def get_campaign_active(ds, **kwargs):
logging.info('Checking for inactive campaign types..')
the_db = kwargs['client']

ds(在设置provide_context=True时,所有其他宏都传递给kwargs,您可以像以前那样使用命名参数,也可以让ds也传递给kwargs)

因为在你的代码中,你实际上没有使用ds或任何其他宏,你可以改变你的函数签名为get_campaign_active(**kwargs)和删除provide_context=True。注意,从Airflow>=2.0开始,根本不需要provide_context=True

最新更新