显然Flink 1.14.0在编程设置S3选项时不能正确翻译。我创建了一个本地环境,像这样连接到本地MinIO实例:
val flinkConf = new Configuration()
flinkConf.setString("s3.endpoint", "http://127.0.0.1:9000")
flinkConf.setString("s3.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(flinkConf)
然后StreamingFileSink
失败了,有一个巨大的堆栈跟踪,其中最相关的消息是Caused by: org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: No AWS Credentials provided by SimpleAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.SdkClientException: Failed to connect to service endpoint:
,这意味着Hadoop试图枚举所有的凭据提供者,而不是使用配置中的一组。我做错了什么?
我也花了很长时间试图弄清楚这一点。我找不到以编程方式设置它的方法,但在我的Flink java项目根中添加以下内容到src/main/resources/core-site.xml最终有效:
<?xml version="1.0"?>
<configuration>
<property>
<name>fs.s3a.aws.credentials.provider</name>
<value>com.amazonaws.auth.profile.ProfileCredentialsProvider</value>
</property>
</configuration>
然后我可以使用AWS_PROFILE env var来选择存储的凭据。这是Flink与Flink -s3-fs-hadoop 1.13.2