我无法理解从Hadoop获取输入文件时如何在Flink中指定类型信息。我正在尝试这样做:
DataSet<Tuple2<LongWritable,Text>> data =
env.readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class, args[0], job, );
文档说它需要类型信息作为最后一个参数。但我无法理解我应该如何给予它。有人可以帮忙吗?
下面是一个如何在 Flink 中使用 HadoopInputFormat 的简短示例:
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
final String inputPath = ...
final HadoopInputFormat<LongWritable, Text> hadoopFile = HadoopInputs.readHadoopFile(
new TextInputFormat(),
LongWritable.class,
Text.class,
inputPath);
DataSet<Tuple2<LongWritable, Text>> input = env.createInput(hadoopFile);
编码如下:
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<LongWritable, Text> hadoopFile = org.apache.flink.hadoopcompatibility.HadoopInputs.readHadoopFile(
new org.apache.hadoop.mapreduce.lib.input.TextInputFormat(), // extends org.apache.hadoop.mapreduce.lib.input.FileInputFormat
org.apache.hadoop.io.LongWritable.class,
org.apache.hadoop.io.Text.class,
inputPath);
DataSet<Tuple2<LongWritable, Text>> input = env.createInput(hadoopFile);
除了maven pom需要这个依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.10</artifactId>
<version>${flink.version}</version>
</dependency>