Flink:使用StreamingFileSink时设置ACL



我正在尝试将我的Flink作业(在EMR上运行的v1.8(从使用BucketingSink转换到新的StreamingFileSink。

我已经运行了新代码,几乎所有的东西看起来都很好。文件被写入S3并转换为完整。唯一的问题是S3的ACL没有像旧代码那样设置。

我的core-site.xml像一样

<configuration>
<property>
<name>fs.s3a.acl.default</name>
<value>BucketOwnerFullControl</value>
</property>
</configuration>

我还在StreamingFileSink生成器的forRowFormat()参数中使用s3a://作为路径的前缀。

此外,当切换到StreamingFileSink时,我不得不在我的build.gradle 中添加一个新的依赖项

flinkShadowJar "org.apache.flink:flink-s3-fs-hadoop:${flinkVersion}"

当我使用BucketingSink api时,我不太清楚我是如何在没有这个jar的情况下使用s3a://前缀来编写S3的。不知怎么的,我现在写S3的方式不尊重我的core-site.xml设置。

我通过多次尝试和错误发现,在我的flink-conf.yml中添加以下行解决了这个问题。

fs.s3a.acl.default: BucketOwnerFullControl

最新更新