我已经阅读了之前与此相关的文章,但没有得到任何有意义的内容。
我的用例是:
- 汇总印象和点击数据
- 在不同的文件中分离已点击和未点击的数据。
我已经写了mapper和reducer,但那个reducer的输出是包含点击&没有点击,它在同一个文件中。我想分开的数据,所以点击的数据应该出现在一个文件和非点击应该出现在另一个文件。
错误:
java.lang.IllegalStateException: Reducer has been already set
at org.apache.hadoop.mapreduce.lib.chain.Chain.checkReducerAlreadySet(Chain.java:662)
Configuration conf = new Configuration();
conf.set("mapreduce.output.fileoutputformat.compress", "true");
conf.set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.GzipCodec");
conf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec");
conf.set("mapreduce.output.fileoutputformat.compress.type", "BLOCK");
Job job = Job.getInstance(conf, "IMPRESSION_CLICK_COMBINE_JOB");
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setReducerClass(ImpressionClickReducer.class);
FileInputFormat.setInputDirRecursive(job, true);
// FileInputFormat.addInputPath(job, new Path(args[0]));
// job.setMapperClass(ImpressionMapper.class);
Path p = new Path(args[2]);
FileSystem fs = FileSystem.get(conf);
fs.exists(p);
fs.delete(p, true);
/**
* Here directory of impressions will be present
*/
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, ImpressionMapper.class);
/**
* Here directory of clicks will be present
*/
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, ClickMapper.class);
FileOutputFormat.setOutputPath(job, new Path(args[2]));
job.setNumReduceTasks(10);
job.setPartitionerClass(TrackerPartitioner.class);
ChainReducer.setReducer(job, ImpressionClickReducer.class, Text.class, Text.class, Text.class, Text.class, new Configuration(false));
ChainReducer.addMapper(job, ImpressionClickMapper.class, Text.class, Text.class, Text.class, Text.class, new Configuration(false));
//Below mentioned line is giving Error
//ChainReducer.setReducer(job, ImpressionAndClickReducer.class, Text.class, Text.class, Text.class, Text.class, new Configuration(false));
job.waitForCompletion(true);
ChainReducer用于在Reducer之后链接Map任务,您只能调用setReducer()
一次(参见这里的代码)。
来自Javadocs:
ChainReducer类允许将多个Mapper类在
使用ChainMapper和ChainReducer类可以组合Map/Reduce作业,看起来像[Map +/Reduce Map *]。这种模式的直接好处是磁盘IO的显著减少。
所以这个想法是你设置一个单一的Reducer,然后在那之后链Map操作。
听起来你实际上想使用MultipleOutputs。Hadoop javadoc提供了如何使用它的示例。有了这个,你可以定义多个输出,它取决于你的输出键/值被写入。