我需要通过气流连接设置(AWS, Postgres)到docker容器环境变量
我试图使用自定义操作符和BaseHook来做到这一点。
class S3ToPostgresDockerOperator(DockerOperator):
@apply_defaults
def __init__(self, aws_conn_id='aws_default', postgres_conn_id='postgres_default', **kwargs):
super(S3ToPostgresDockerOperator, self).__init__(**kwargs)
self.aws_conn = BaseHook.get_connection(aws_conn_id)
self.pg_conn = BaseHook.get_connection(postgres_conn_id)
有可能这样做吗?如果不能,我应该怎么做?
java_unpack_csv = S3ToPostgresDockerOperator(
...
environment={
'AWS_ACCESS_KEY': '{{ ??? }}',
'AWS_SECRET_KEY': '{{ ??? }}'
}
)
您可以构建在DockerOperator
构造函数中传递的环境。
例如,
class S3ToPostgresDockerOperator(DockerOperator):
@apply_defaults
def __init__(self, aws_conn_id='aws_default', postgres_conn_id='postgres_default', **kwargs):
self.aws_conn = BaseHook.get_connection(aws_conn_id)
self.pg_conn = BaseHook.get_connection(postgres_conn_id)
credentials = self.aws_conn.get_credentials()
kwargs['environment'] = dict(
kwargs.pop('environment', {}),
AWS_ACCESS_KEY=credentials.access_key,
AWS_SECRET_KEY=credentials.secret_key,
PG_DATABASE_URI=self.pg_conn.get_uri()
)
super(S3ToPostgresDockerOperator, self).__init__(**kwargs)