以编程方式配置Flink中的S3选项



显然Flink 1.14.0在编程设置S3选项时不能正确翻译。我创建了一个本地环境,像这样连接到本地MinIO实例:

val flinkConf = new Configuration()
flinkConf.setString("s3.endpoint", "http://127.0.0.1:9000")
flinkConf.setString("s3.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(flinkConf)

然后StreamingFileSink失败了,有一个巨大的堆栈跟踪,其中最相关的消息是Caused by: org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: No AWS Credentials provided by SimpleAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.SdkClientException: Failed to connect to service endpoint:,这意味着Hadoop试图枚举所有的凭据提供者,而不是使用配置中的一组。我做错了什么?

我也花了很长时间试图弄清楚这一点。我找不到以编程方式设置它的方法,但在我的Flink java项目根中添加以下内容到src/main/resources/core-site.xml最终有效:

<?xml version="1.0"?>
<configuration>
<property>
<name>fs.s3a.aws.credentials.provider</name>
<value>com.amazonaws.auth.profile.ProfileCredentialsProvider</value>
</property>
</configuration>

然后我可以使用AWS_PROFILE env var来选择存储的凭据。这是Flink与Flink -s3-fs-hadoop 1.13.2

相关内容

  • 没有找到相关文章

最新更新