在Hadoop MapReduce中使用WholeFileInputFormat仍然会导致Mapper一次处理一行



扩展我的头在使用Hadoop 2.6..并且需要将整个文件发送到我的映射器,而不是一次发送一行。我已经按照汤姆·怀特在权威指南中的代码创建了WholeFileInputFormat和WholeFileRecordReader,但我的Mapper仍然是一次处理一行文件。有人知道我的代码中缺少什么吗?我完全按照我所看到的使用了书中的例子。如有任何指导,将不胜感激。

WholeFileInputFormat.java

public class WholeFileInputFormat extends FileInputFormat <NullWritable, BytesWritable>{
@Override
protected boolean isSplitable(JobContext context, Path file){
    return false;
}
@Override
public RecordReader<NullWritable, BytesWritable> createRecordReader(
        InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
    WholeFileRecordReader reader = new WholeFileRecordReader();
    reader.initialize(split, context);
    return reader;
}

}

WholeFileRecordReader.java

public class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
private FileSplit fileSplit;
private Configuration conf;
private BytesWritable value = new BytesWritable();
private boolean processed = false;
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException{
    this.fileSplit = (FileSplit) split;
    this.conf = context.getConfiguration();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException{
    if (!processed){
        byte[] contents = new byte[(int) fileSplit.getLength()];
        Path file = fileSplit.getPath();
        FileSystem fs = file.getFileSystem(conf);
        FSDataInputStream in = null;
        try{
            in = fs.open(file);
            IOUtils.readFully(in, contents, 0, contents.length);
            value.set(contents, 0, contents.length);
        }finally{
            IOUtils.closeStream(in);
        }
        processed = true;
        return  true;
    }
    return false;
}
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException{
    return NullWritable.get();
}
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException{
    return value;
}
@Override
public float getProgress() throws IOException {
    return processed ? 1.0f : 0.0f;
}
@Override
public void close() throws IOException{
    //do nothing :)
}

}

我的Mapreduce

的主要方法
public class ECCCount {
public static void main(String[] args) throws Exception {
    if (args.length != 2) {
      System.out.printf("Usage: ProcessLogs <input dir> <output dir>n");
      System.exit(-1);
    }
    //@SuppressWarnings("deprecation")
    Job job = new Job();
    job.setJarByClass(ECCCount.class);
    job.setJobName("ECCCount");
    //FileInputFormat.setInputPaths(job, new Path(args[0]));
    WholeFileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    job.setMapperClass(ECCCountMapper.class);
    job.setReducerClass(SumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    boolean success = job.waitForCompletion(true);
    System.exit(success ? 0 : 1);
  }

}

还有我的Mapper。现在它只是返回给定的值作为测试用例,看看它返回的是一行还是整个文件

public class ECCCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
@Override
  public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {
      context.write(new Text(value), new IntWritable(1));
  }

}

Issue可以是mapper的输入格式。你有LongWritable和text。然而,在上面提到的例子中,他们使用了NullWritable, BytesWritable,因为这就是WholeFileInputFormat所拥有的。此外,您需要在Job类(主方法)中给出 Job . setinputformatclass (WholeFileInputFormat.class);。希望有帮助,快乐编码

多亏了Ramzy的输入,我发现了我的错误,并能够通过以下更改获得整个文件

在我的main方法中,我需要指定我需要使用的InputFormatClass。

job.setInputFormatClass(WholeFileInputFormat.class)

和我的Mapper需要期望正确的类型作为输入

public class ECCCountMapper extends Mapper<NullWritable, BytesWritable, Text, IntWritable>{

这两个更改成功地将整个文件的一个字节[]发送到我的映射器,在那里我可以根据需要对其进行操作。

最新更新