第二个MR作业没有在Hadoop中终止



我有一个用例,我想要第一个MR的输出到第二个MR的输入。我在Hadoop中使用ControlJob实现了这一点,但在工作结束时,我得到了下面提到的异常。

java.lang.IllegalStateException: Job in state RUNNING instead of DEFINE
    at org.apache.hadoop.mapreduce.Job.ensureState(Job.java:294)
    at org.apache.hadoop.mapreduce.Job.submit(Job.java:1288)
    at org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob.submit(ControlledJob.java:335)
    at org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl.run(JobControl.java:240)
    at com.hadoop.intellipaat.JoinClickImpressionDetailJob.runMRJobs(JoinClickImpressionDetailJob.java:353)
    at com.hadoop.intellipaat.JoinClickImpressionDetailJob.run(JoinClickImpressionDetailJob.java:421)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
    at com.hadoop.intellipaat.JoinClickImpressionDetailJob.main(JoinClickImpressionDetailJob.java:309)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
<<p> 源代码/strong>
public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(), new JoinClickImpressionDetailJob(), args);
        System.exit(1);
    }
    private static int runMRJobs(String[] args) {
        int result = -1;
        Configuration conf = new Configuration();
        conf.set("mapreduce.output.fileoutputformat.compress", "true");
        conf.set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.GzipCodec");
        conf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec");
        conf.set("mapreduce.output.fileoutputformat.compress.type", "BLOCK");
        ControlledJob mrJob1 = null;
        Job firstJob = null;
        try {
            deleteDirectory(args[2], conf);
            mrJob1 = new ControlledJob(conf);
            mrJob1.setJobName("IMPRESSION_CLICK_COMBINE_JOB");
            firstJob = mrJob1.getJob();
            result += firstMapReduceJob(args, firstJob);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("First Job Finished=============");
        System.out.println("Second Job Started=============");
        ControlledJob mrJob2 = null;
        try {
            mrJob2 = new ControlledJob(conf);
            deleteDirectory(args[3], conf);
            mrJob2.addDependingJob(mrJob1);
            mrJob2.setJobName("IMPRESSION_CLICK_COMBINE_JOB1");
            Job job2 = mrJob2.getJob();
            result += secondMapReduceJob(args, job2);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("Second Job Finished=============");
        JobControl jobControl = new JobControl("Click-Impression-aggregator");
        jobControl.addJob(mrJob1);
        jobControl.addJob(mrJob2);
        jobControl.run();
        return result;
    }
    private static int secondMapReduceJob(String[] args, Job job2) throws IOException, InterruptedException, ClassNotFoundException {
        long startTime = System.currentTimeMillis();
        job2.setMapOutputKeyClass(Text.class);
        job2.setMapOutputValueClass(Text.class);
        job2.setJarByClass(JoinClickImpressionDetailJob.class);
        job2.setInputFormatClass(TextInputFormat.class);
        job2.setOutputFormatClass(TextOutputFormat.class);
        job2.setReducerClass(ImpressionAndClickReducer.class);
        FileInputFormat.setInputDirRecursive(job2, true);
        FileInputFormat.addInputPath(job2, new Path(args[2]));
        job2.setMapperClass(ImpressionClickMapper.class);
        FileOutputFormat.setOutputPath(job2, new Path(args[3]));
        job2.setNumReduceTasks(8);
        job2.setPartitionerClass(ClickNonClickPartitioner.class);
        System.out.println("Time taken : " + (System.currentTimeMillis() - startTime) / 1000);
        return job2.waitForCompletion(true) ? 1 : 0;
    }
    private static int firstMapReduceJob(String[] args, Job job) throws IOException, InterruptedException, ClassNotFoundException {
        long startTime = System.currentTimeMillis();
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setJarByClass(JoinClickImpressionDetailJob.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setReducerClass(ImpressionClickReducer.class);
        FileInputFormat.setInputDirRecursive(job, true);
        /**
         * Here directory of impressions will be present
         */
        MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, ImpressionMapper.class);
        /**
         * Here directory of clicks will be present
         */
        MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, ClickMapper.class);
        FileOutputFormat.setOutputPath(job, new Path(args[2]));
        job.setNumReduceTasks(10);
        job.setPartitionerClass(TrackerPartitioner.class);
        System.out.println("Time taken : " + (System.currentTimeMillis() - startTime) / 1000);
        return job.waitForCompletion(true) ? 1 : 0;
    }
    private static void deleteDirectory(String args, Configuration conf) throws IOException {
        Path p = new Path(args);
        FileSystem fs = FileSystem.get(conf);
        fs.exists(p);
        fs.delete(p, true);
    }
    @Override
    public int run(String[] args) throws Exception {
        return runMRJobs(args);
    }

完整代码:https://github.com/ragnar-lothbrok/hadoop-demo/blob/master/src/main/java/com/hadoop/intellipaat/JoinClickImpressionDetailJob.java

按如下方式更新代码。您可以在没有Control作业的情况下实现此功能。

private static int runMRJobs(String[] args) {
        int result = -1;      
 Configuration conf = getConf();
        conf.set("mapreduce.output.fileoutputformat.compress", "true");
        conf.set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.GzipCodec");
        conf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec");
        conf.set("mapreduce.output.fileoutputformat.compress.type", "BLOCK");
        Job job1 = new Job(conf,"IMPRESSION_CLICK_COMBINE_JOB");
            try {
                deleteDirectory(args[2], conf);
                result += firstMapReduceJob(args, job1);
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("First Job Finished=============");
            System.out.println("Second Job Started=============");
            Job job2 = new Job(conf,"IMPRESSION_CLICK_COMBINE_JOB1");
            try {
                deleteDirectory(args[3], conf);
                result += secondMapReduceJob(args, job2);
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("Second Job Finished=============");
    return result;
    }

在你的第一份工作回报如下

return job1.waitForCompletion(true);

在你的第二份工作返回如下

return job2.waitForCompletion(true) ? 0 : 1;

相关内容

  • 没有找到相关文章