我有hdfs数据文件,这些文件最初是由mapreduce作业创建的,输出设置如下,
job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(BytesWritable.class);
job.setOutputFormatClass(SequenceFileAsBinaryOutputFormat.class);
SequenceFileAsBinaryOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
现在我正在尝试使用 Flink DataSet API(版本 1.5.6(读取这些文件,我查看了 flink 文档,但无法弄清楚如何做到这一点。
- 在文档中,有一个API"readSequenceFile",我只是在类ExecutionEnvironment中找不到它,我可以找到"readCsvFile","readTextFile",但不是这个。
- 有一个通用的"readFile(inputFormat,path(",但我不知道inputFormat是什么,似乎这个API不接受hadoop输入格式,例如"SequenceFileAsBinaryInputFormat"。
谁能在这里提供一些启示?非常感谢。
我想你错过的是一个额外的依赖关系:"org.apache.flink" %% "flink-hadoop-compatibility" % 1.7.2
添加后,您可以运行:
val env = ExecutionEnvironment.getExecutionEnvironment
env.createInput(HadoopInputs.readSequenceFile[Long, String](classOf[Long], classOf[String], "/data/wherever"))
在此处查找有关内容和方式的更详细文档 https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/hadoop_compatibility.html
希望有帮助