我读到了关于mapreduce的文章,想知道一个特定的场景。假设我们有几个文件(例如fileA、fileB、fileC),每个文件都由多个整数组成。如果我们想对所有文件中的数字进行排序,以创建这样的东西:
23 fileA
34 fileB
35 fileA
60 fileA
60 fileC
映射和减少流程将如何工作?
目前,这是我所拥有的,但并不完全正确;
(fileName, fileContent) -> (map to) (Number, fileName)
对临时键、值对进行排序
(Number, (list of){fileName1, fileName2...})
减少临时配对并获得
(Number, fileName1) (Number, fileName2)
等等
问题是,在排序阶段,文件名可能不按字母顺序排列,因此reduce部分不会生成正确的输出。有人能为这种情况的正确方法提供一些见解吗?
实现这一点的最佳方法是通过二次排序。您需要对键(在您的案例编号中)和值(在您案例文件名中)进行排序。在Hadoop中,映射程序的输出仅按键排序。
这可以通过使用复合键来实现:该键是数字和文件名的组合。例如,对于第一条记录,密钥将是(23,fileA),而不是仅是(23)。
您可以在此处阅读有关二次排序的信息:https://www.safaribooksonline.com/library/view/data-algorithms/9781491906170/ch01.html
您还可以浏览"Hadoop最终指南"一书中的"辅助排序部分。
为了简单起见,我写了一个程序来实现同样的目的。
在该程序中,默认情况下,键由映射器进行排序。我已经编写了一个逻辑来对reducer端的值进行排序。因此,它负责对键和值进行排序,并产生所需的输出。
以下是程序:
package com.myorg.hadooptests;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.*;
public class SortedValue {
public static class SortedValueMapper
extends Mapper<LongWritable, Text , Text, IntWritable>{
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] tokens = value.toString().split(" ");
if(tokens.length == 2) {
context.write(new Text(tokens[1]), new IntWritable(Integer.parseInt(tokens[0])));
}
}
}
public static class SortedValueReducer
extends Reducer<Text, IntWritable, IntWritable, Text> {
Map<String, ArrayList<Integer>> valueMap = new HashMap<String, ArrayList<Integer>>();
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
String keyStr = key.toString();
ArrayList<Integer> storedValues = valueMap.get(keyStr);
for (IntWritable value : values) {
if (storedValues == null) {
storedValues = new ArrayList<Integer>();
valueMap.put(keyStr, storedValues);
}
storedValues.add(value.get());
}
Collections.sort(storedValues);
for (Integer val : storedValues) {
context.write(new IntWritable(val), key);
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "CompositeKeyExample");
job.setJarByClass(SortedValue.class);
job.setMapperClass(SortedValueMapper.class);
job.setReducerClass(SortedValueReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("/in/in1.txt"));
FileOutputFormat.setOutputPath(job, new Path("/out/"));
System.exit(job.waitForCompletion(true) ? 0:1);
}
}
映射器逻辑:
- 分析每一行。假设键和值由一个空白字符(")分隔
- 如果该行包含2个标记,则会发出(文件名,整数值)。例如,对于第一条记录,它会发出(fileA,23)
还原器逻辑:
它将(key,value)对放在HashMap中,其中key是文件名,value是该文件的整数列表。例如,对于文件A,存储的值将是23、34和35
最后,它对特定键的值进行排序,并对每个值从reducer中发射(value,key)。例如,对于fileA,输出的记录为:(23,fileA)、(34,fileA
我为以下输入运行了这个程序:
34 fileB
35 fileA
60 fileC
60 fileA
23 fileA
我得到了以下输出:
23 fileA
35 fileA
60 fileA
34 fileB
60 fileC