链接两个作业时,hadoop.mapreduce.lib.input.FileInputFormat.getBlockI



我正在尝试建立倒排索引。

链接两个作业。

基本上,第一个作业解析输入并清理它,并将结果存储在文件夹'output'中,该文件夹是第二个作业的输入文件夹。

第二个任务应该实际构建倒排索引。

当我刚做第一份工作的时候,它工作得很好(至少,没有例外)。

我像这样链接两个作业:

public class Main {
    public static void main(String[] args) throws Exception {
        String inputPath = args[0];
        String outputPath = args[1];
        String stopWordsPath = args[2];
        String finalOutputPath = args[3];
        Configuration conf = new Configuration();    
        conf.set("job.stopwords.path", stopWordsPath);
        Job job = Job.getInstance(conf, "Tokenize");
        job.setJobName("Tokenize");
        job.setJarByClass(TokenizerMapper.class);
        job.setNumReduceTasks(1);
        FileInputFormat.setInputPaths(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(PostingListEntry.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(PostingListEntry.class);
        job.setOutputFormatClass(MapFileOutputFormat.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setReducerClass(TokenizerReducer.class);
        // Delete the output directory if it exists already.
        Path outputDir = new Path(outputPath);
        FileSystem.get(conf).delete(outputDir, true);
        long startTime = System.currentTimeMillis();
        job.waitForCompletion(true);
        System.out.println("Job Finished in " + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
        //-------------------------------------------------------------------------
        Configuration conf2 = new Configuration();    
        Job job2 = Job.getInstance(conf2, "BuildIndex");
        job2.setJobName("BuildIndex");
        job2.setJarByClass(InvertedIndexMapper.class);
        job2.setOutputFormatClass(TextOutputFormat.class);
        job2.setNumReduceTasks(1);
        FileInputFormat.setInputPaths(job2, new Path(outputPath));
        FileOutputFormat.setOutputPath(job2, new Path(finalOutputPath));
        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(PostingListEntry.class);
        job2.setMapperClass(InvertedIndexMapper.class);
        job2.setReducerClass(InvertedIndexReducer.class);
        // Delete the output directory if it exists already.
        Path finalOutputDir = new Path(finalOutputPath);
        FileSystem.get(conf2).delete(finalOutputDir, true);
        startTime = System.currentTimeMillis();
        // THIS LINE GIVES ERROR: 
        job2.waitForCompletion(true);
        System.out.println("Job Finished in " + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
    }
}
我得到一个
Exception in thread "main" java.lang.NullPointerException
    at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getBlockIndex(FileInputFormat.java:444)
    at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:413)
    at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:301)
    at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:318)
    at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:196)
    at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)
    at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287)
    at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1308)
    at Main.main(Main.java:79)

这个配置有什么问题,我应该如何链接作业?

不清楚您是否有意在第一个作业中使用MapFileOutputFormat作为输出格式。更常见的方法是在第二个作业中使用SequenceFileOutputFormatSequenceFileInputFormat作为输入格式。

目前,您已指定MapFileOutputFormat作为第一个作业的输出,而在第二个作业中没有指定输入,因此它将是不太可能工作的TextInputFormat

查看TokenizerReducer类,reduce方法的签名是不正确的。你有:

public void reduce(Text key, Iterator<PostingListEntry> values, Context context)

应该是:

public void reduce(Key key, Iterable<PostingListEntry> values, Context context)

因为它不会调用你的实现,所以它只是一个identity reduce

最新更新