如何在PostgresOperator气流中使用for循环传递参数



我正在使用PostgresOperator,我想传递表名后缀到我的SQL查询,所以当它查询数据时,它从for循环迭代中动态读取

for country in countries:
matchTimeStamp = ShortCircuitOperator(task_id='Match_Updated_dates_{}'.format(country), provide_context=True,
python_callable=match_dates,op_kwargs={'key1': country}, default_args=default_args)

所以你可以看到我在task_id中传递了。format(country)。我想做类似的东西通过传递国家名称,如在下面的SQL语句,但似乎气流不喜欢它。请建议一个正确的方法最后,我在SQL语句的末尾传递了。format country

import_redshift_table = PostgresOperator(
task_id='copy_data_from_redshift_{}'.format(country),
postgres_conn_id='postgres_default',
sql='''
unload ('select * from angaza_public_{}.accounts')
to 's3://mygluecrawlerbucket/angaza_accounts/to_be_processed/anagaza_{}.csv'
credentials 'aws_access_key_id=AWSDD****HHJJJJ;aws_secret_access_key=ABCDEFDHPASSEORD/JmlGjyEQMVOBme'
DELIMITER ','
HEADER
PARALLEL OFF
'''.format(country))

-----更新,我能够找到一个解决方案--------我在。格式(国家,国家)中添加了一个额外的国家关键字

import_redshift_table = PostgresOperator(
task_id='copy_data_from_redshift_{}'.format(country),
postgres_conn_id='postgres_default',
sql='''
unload ('select * from angaza_public_{}.accounts')
to 's3://mygluecrawlerbucket/angaza_accounts/to_be_processed/anagaza_{}.csv'
credentials 'aws_access_key_id=AKIA6J7OV4FRSYH6DIXL;aws_secret_access_key=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'
DELIMITER ','
HEADER
PARALLEL OFF
'''.format(country, country))

你有一个额外的括号,使它不能工作。此外,我认为f-string比。format更具可读性。这样,它就可以工作了:

import_redshift_table = PostgresOperator(
task_id=f'copy_data_from_redshift_{country}',
postgres_conn_id='postgres_default',  # this is not necessary if its the default
sql=f"""
unload ('select * from angaza_public_{country}.accounts')
to 's3://mygluecrawlerbucket/angaza_accounts/to_be_processed/anagaza_{country}.csv'
credentials 'aws_access_key_id=AWSDD****HHJJJJ;aws_secret_access_key=ABCDEFDHPASSEORD/JmlGjyEQMVOBme'
DELIMITER ','
HEADER
PARALLEL OFF
"""

顺便说一下,使用IAM Role而不是凭据从Redshift卸载是一个很好的做法,这样它们就不会出现在日志中。

相关内容

  • 没有找到相关文章

最新更新