如何在 flink 的 readHadoopFile 中指定 TypeInformation



我无法理解从Hadoop获取输入文件时如何在Flink中指定类型信息。我正在尝试这样做:

DataSet<Tuple2<LongWritable,Text>> data =
            env.readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class, args[0], job, );

文档说它需要类型信息作为最后一个参数。但我无法理解我应该如何给予它。有人可以帮忙吗?

下面是一个如何在 Flink 中使用 HadoopInputFormat 的简短示例:

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
final String inputPath = ...
final HadoopInputFormat<LongWritable, Text> hadoopFile = HadoopInputs.readHadoopFile(
        new TextInputFormat(),
        LongWritable.class,
        Text.class,
        inputPath);
DataSet<Tuple2<LongWritable, Text>> input = env.createInput(hadoopFile);

编码如下:

org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<LongWritable, Text> hadoopFile = org.apache.flink.hadoopcompatibility.HadoopInputs.readHadoopFile(
            new org.apache.hadoop.mapreduce.lib.input.TextInputFormat(), // extends org.apache.hadoop.mapreduce.lib.input.FileInputFormat
            org.apache.hadoop.io.LongWritable.class,
            org.apache.hadoop.io.Text.class,
            inputPath);
    DataSet<Tuple2<LongWritable, Text>> input = env.createInput(hadoopFile);

除了maven pom需要这个依赖:

 <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-hadoop-compatibility_2.10</artifactId>
        <version>${flink.version}</version>
 </dependency>

相关内容

  • 没有找到相关文章

最新更新