气流自定义操作变量



我需要通过气流连接设置(AWS, Postgres)到docker容器环境变量
我试图使用自定义操作符和BaseHook来做到这一点。

class S3ToPostgresDockerOperator(DockerOperator):
@apply_defaults
def __init__(self, aws_conn_id='aws_default', postgres_conn_id='postgres_default', **kwargs):
super(S3ToPostgresDockerOperator, self).__init__(**kwargs)
self.aws_conn = BaseHook.get_connection(aws_conn_id)
self.pg_conn = BaseHook.get_connection(postgres_conn_id)

有可能这样做吗?如果不能,我应该怎么做?

java_unpack_csv = S3ToPostgresDockerOperator(
...
environment={
'AWS_ACCESS_KEY': '{{ ??? }}',
'AWS_SECRET_KEY': '{{ ??? }}'
}
)

您可以构建在DockerOperator构造函数中传递的环境。

例如,

class S3ToPostgresDockerOperator(DockerOperator):
@apply_defaults
def __init__(self, aws_conn_id='aws_default', postgres_conn_id='postgres_default', **kwargs):
self.aws_conn = BaseHook.get_connection(aws_conn_id)
self.pg_conn = BaseHook.get_connection(postgres_conn_id)

credentials = self.aws_conn.get_credentials()
kwargs['environment'] = dict(
kwargs.pop('environment', {}),
AWS_ACCESS_KEY=credentials.access_key,
AWS_SECRET_KEY=credentials.secret_key,
PG_DATABASE_URI=self.pg_conn.get_uri()
)
super(S3ToPostgresDockerOperator, self).__init__(**kwargs)

相关内容

  • 没有找到相关文章

最新更新