Flink,我在哪里可以找到 ExecutionEnvironment#readSequenceFile 方法



我有hdfs数据文件,这些文件最初是由mapreduce作业创建的,输出设置如下,

job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(BytesWritable.class);
job.setOutputFormatClass(SequenceFileAsBinaryOutputFormat.class);
SequenceFileAsBinaryOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);

现在我正在尝试使用 Flink DataSet API(版本 1.5.6(读取这些文件,我查看了 flink 文档,但无法弄清楚如何做到这一点。

  1. 在文档中,有一个API"readSequenceFile",我只是在类ExecutionEnvironment中找不到它,我可以找到"readCsvFile","readTextFile",但不是这个。
  2. 有一个通用的"readFile(inputFormat,path(",但我不知道inputFormat是什么,似乎这个API不接受hadoop输入格式,例如"SequenceFileAsBinaryInputFormat"。

谁能在这里提供一些启示?非常感谢。

我想你错过的是一个额外的依赖关系:"org.apache.flink" %% "flink-hadoop-compatibility" % 1.7.2

添加后,您可以运行:

val env = ExecutionEnvironment.getExecutionEnvironment
env.createInput(HadoopInputs.readSequenceFile[Long, String](classOf[Long], classOf[String], "/data/wherever"))

在此处查找有关内容和方式的更详细文档 https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/hadoop_compatibility.html

希望有帮助

相关内容

  • 没有找到相关文章

最新更新