我的工作是统计推文中的标签词,然后显示最频繁的15个。
当从测试类对其进行本地测试时,它工作得很好,但在集群上,它只给出最后一个计算的条目。
映射器
public class HashtagMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final IntWritable ONE = new IntWritable(1);
private Text word = new Text();
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String cleanValue = value.toString().replaceAll("[^A-Za-z0-9#]", " ");
cleanValue = cleanValue.replace("# ", " ");
StringTokenizer itr = new StringTokenizer(cleanValue, " ");
while (itr.hasMoreTokens()) {
word.set(itr.nextToken().trim());
if (word.find("#", 0) == 0) {
context.write(new Text(word), ONE);
}
}
}
}
减速器
public class HashtagReducer extends Reducer<Text, IntWritable, Text, DoubleWritable> {
private final HashMultimap<Text, Integer> hashtags = HashMultimap.create();
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
hashtags.put(key, sum);
}
@Override
public void cleanup(Context context) throws IOException, InterruptedException {
Map<Text, Collection<Integer>> sortedMap = sortByComparator(hashtags.asMap());
Set<Text> keys = sortedMap.keySet();
int index = 0;
for (Text key : keys) {
context.write(new Text(key), new DoubleWritable(sortedMap.get(key).iterator().next()));
index++;
if (index == Math.min(keys.size() -1, 15)) {
break;
}
}
}
private static Map<Text, Collection<Integer>> sortByComparator(Map<Text, Collection<Integer>> unsortedMap) {
List<Map.Entry<Text, Collection<Integer>>> list = new LinkedList<Map.Entry<Text, Collection<Integer>>>(unsortedMap.entrySet());
Collections.sort(list, Collections.reverseOrder(new Comparator<Map.Entry<Text, Collection<Integer>>>() {
public int compare(Map.Entry<Text, Collection<Integer>> o1,
Map.Entry<Text, Collection<Integer>> o2) {
return (o1.getValue()).iterator().next().compareTo(o2.getValue().iterator().next());
}
}));
Map<Text, Collection<Integer>> sortedMap = new LinkedHashMap<Text, Collection<Integer>>();
for (Map.Entry<Text, Collection<Integer>> entry : list) {
sortedMap.put(entry.getKey(), entry.getValue());
}
return sortedMap;
}
}
驱动程序
public class StubDriver {
public static void main(String[] args) throws Exception {
/*
* Validate that two arguments were passed from the command line.
*/
if (args.length != 2) {
System.out.printf("Usage: StubDriver <input dir> <output dir>n");
System.exit(-1);
}
/*
* Instantiate a Job object for your job's configuration.
*/
Job job = Job.getInstance();
/*
* Specify the jar file that contains your driver, mapper, and reducer.
* Hadoop will transfer this jar file to nodes in your cluster running
* mapper and reducer tasks.
*/
job.setJarByClass(StubDriver.class);
/*
* Specify an easily-decipherable name for the job.
* This job name will appear in reports and logs.
*/
job.setJobName("StubDriver");
job.setMapperClass(HashtagMapper.class);
job.setReducerClass(HashtagReducer.class);
FileInputFormat.setInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
/*
* Start the MapReduce job and wait for it to finish.
* If it finishes successfully, return 0. If not, return 1.
*/
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1);
}
}
测试
公共类HashtagTest{
MapDriver<LongWritable, Text, Text, IntWritable> mapDriver;
MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, DoubleWritable> mapReduceDriver;
@Before
public void setUp() {
HashtagMapper hashtagMapper = new HashtagMapper();
HashtagReducer hashtagReducer = new HashtagReducer();
mapDriver = new MapDriver<LongWritable, Text, Text, IntWritable>();
mapDriver.setMapper(hashtagMapper);
mapReduceDriver = new MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, DoubleWritable>();
mapReduceDriver.setMapper(hashtagMapper);
mapReduceDriver.setReducer(hashtagReducer);
}
@Test
public void testMapReducer_3() {
try {
File file = new File("/home/paul/Documents/Studium/Master Bussines Informatics/3 Semester/Business Intelligence/Part 2/tweets/small.json");
FileInputStream fileInputStream = new FileInputStream(file);
byte[] data = new byte[(int) file.length()];
fileInputStream.read(data);
fileInputStream.close();
String str = new String(data, "UTF-8");
mapReduceDriver
.withInput(new LongWritable(), new Text(str))
.withOutput(new Text("#Like"), new DoubleWritable((77)))
.withOutput(new Text("#Retweet"), new DoubleWritable(75))
.withOutput(new Text("#Malerei"), new DoubleWritable(35))
.withOutput(new Text("#N"), new DoubleWritable(35))
.withOutput(new Text("#nagelstudio"), new DoubleWritable(35))
.withOutput(new Text("#nailart"),new DoubleWritable(35))
.withOutput(new Text("#Crystalnails"),new DoubleWritable(35))
.withOutput(new Text("#babyboomer"),new DoubleWritable(35))
.withOutput(new Text("#Geln"),new DoubleWritable(35))
.withOutput(new Text("#GelN"),new DoubleWritable(35))
.withOutput(new Text("#Muster"),new DoubleWritable(35))
.withOutput(new Text("#NagelstudioWien"),new DoubleWritable(35))
.withOutput(new Text("#GoodVibeTribe"),new DoubleWritable(24))
.withOutput(new Text("#1DPL"),new DoubleWritable(24))
.withOutput(new Text("#Przesz"),new DoubleWritable(22))
.runTest();
} catch (Exception e) {
e.printStackTrace();
}
}
}
有人有小费吗?
我通过在Reducer中更改以下内容来解决问题:
- public void cleanup(上下文上下文)=>public void clenup(Reducer.Context上下文)
- hashtag.put()=>this.hashtag.pot()
- sortedMap=sortByComparator(hashtag.asMap())=>sortedMap=sortByComparator(this.hashtag.asMap())