我正在flink集群上部署作业。S3路径类似于:
URL:s3a://bucket-name/pre/pre1/original/2019_12_19/file.parquet.gz
我把它读成:
val job = Job.getInstance
FileInputFormat.addInputPath(
job,
new org.apache.hadoop.fs.Path(url)
)
val hadoopInputFormat =
new HadoopInputFormat(
new AvroParquetInputFormat[GenericRecord],
classOf[Void],
classOf[GenericRecord],
job
)
val data: List[tuple.Tuple2[Void, GenericRecord]] =
env.createInput(hadoopInputFormat).collect().asScala.toList
尽管作业正在使用sbt运行文件,但它并没有提交。
同样,当作业被部署并且S3 URL的类型为:时
URL:s3a://bucket-name/2019_12_19/file.parquet.gz
成功提交。
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(parallelism)
依赖项:
"org.apache.flink" %% "flink-scala" % "1.10.0" % "provided"
"org.apache.flink" % "flink-s3-fs-hadoop" % "1.10.0"
"org.apache.flink" %% "flink-hadoop-compatibility" % "1.10.0"
"org.apache.hadoop" % "hadoop-mapreduce-client-core" % ""3.1.1""
"org.apache.hadoop" % "hadoop-aws" % "2.7.2"
"org.apache.httpcomponents" % "httpcore" % "4.2.5"
"org.apache.httpcomponents" % "httpclient" % "4.2.5"
"org.apache.flink" %% "flink-streaming-scala" % "1.10.0" % "provided"
Exception:
Caused by: java.io.IOException: Class class com.amazonaws.auth.InstanceProfileCredentialsProvider does not implement AWSCredentialsProvider
at org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProvider(S3AUtils.java:623)
at org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet(S3AUtils.java:566)
at org.apache.hadoop.fs.s3a.DefaultS3ClientFactory.createS3Client(DefaultS3ClientFactory.java:52)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:256)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3354)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:477)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:302)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:274)
at org.apache.parquet.hadoop.ParquetInputFormat.listStatus(ParquetInputFormat.java:349)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:396)
at org.apache.parquet.hadoop.ParquetInputFormat.getSplits(ParquetInputFormat.java:304)
at org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:159)
at org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:59)
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:257)
core-site.xml:
<configuration>
<property>
<name>fs.s3a.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<property>
<name>fs.s3a.buffer.dir</name>
<value>/tmp</value>
</property>
<property>
<name>fs.s3a.access.key</name>
<value>iam</value>
</property>
<property>
<name>fs.s3a.secret.key</name>
<value>iam</value>
</property>
<property>
<name>fs.s3a.endpoint</name>
<value>https://s3.us-east-1.amazonaws.com</value>
</property>
<property>
<name>fs.s3a.path.style.access</name>
<value>true</value>
</property>
<property>
<name>fs.s3a.aws.credentials.provider</name>
<value>
com.amazonaws.auth.InstanceProfileCredentialsProvider,
org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider,
com.amazonaws.auth.EnvironmentVariableCredentialsProvider
</value>
</property>
</configuration>
从Flink 1.10开始,只能将flink-s3-fs-hadoop
用作插件。
您基本上需要将罐子添加到您的flink dist中,如下所示。
flink-dist
├── conf
├── lib
...
└── plugins
└── s3
└── flink-s3-fs-hadoop.jar
也可以把它从你的脂肪罐里拿出来。