我是Hadoop框架的新手,我想写一个' mapreduce ';在两个表R和s之间连接x属性的程序(HadoopJoin.java)。这两个表的结构是:
R (tag : char, x : int, y : varchar(30))
and
S (tag : char, x : int, z : varchar(30))
例如R table
:
r 10 r-10-0
r 11 r-11-0
r 12 r-12-0
r 21 r-21-0
对于S表:
s 11 s-11-0
s 21 s-41-0
s 21 s-41-1
s 12 s-31-0
s 11 s-31-1
结果应该如下所示:
r 11 r-11-0 s 11 s-11-0
etc.
有谁能帮帮我吗?
对于不熟悉这个框架的人来说,在mapreduce
中描述join是非常困难的,但是在这里我提供了一个适合您情况的工作实现,我强烈建议您阅读Hadoop The Definitive Guide 4th edition的第9节。它很好地描述了如何在mapreduce中实现Join。
首先,你可以考虑使用更高层次的框架,如Pig, Hive和Spark,因为它们在实现的核心部分提供了连接操作。
其次,根据数据的性质,有许多实现mapreduce的方法。这种方式包括映射端连接和简化端连接。在这个答案中,我已经实现了减侧连接:
实现:
首先,我们应该为两个不同的数据集使用两个不同的映射器请注意,在您的情况下,同一个映射器可以用于两个数据集但在许多情况下,您需要为不同的数据集使用不同的映射器因此,我定义了两个映射器以使此解决方案更通用:
我使用了有两个属性的TextPair
,其中一个是用于连接数据的键,另一个是指定此记录属于哪个数据集的标记。如果它属于第一个数据集,这个标记将为0。否则为1。
我已经实现了TextPair.FirstComparator
,以确保对于每个键(按键连接),第一个数据集的记录是reducer接收的第一个键。第二个数据集中所有其他带有该id的记录在此之后被接收。实际上,这行代码将为我们完成任务:
job.setGroupingComparatorClass(TextPair.FirstComparator.class);
所以在reducer中,我们将接收到的第一个记录是来自dataset1
的记录,之后我们接收到来自dataset2
的记录。我们唯一要做的就是把这些记录写下来。
Mapper for dataset:
public class JoinDataSet1Mapper
extends Mapper<LongWritable, Text, TextPair, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] data = value.toString().split(" ");
context.write(new TextPair(data[1], "0"), value);
}
}
Mapper for DataSet2:
public class JoinDataSet2Mapper
extends Mapper<LongWritable, Text, TextPair, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] data = value.toString().split(" ");
context.write(new TextPair(data[1], "1"), value);
}
}
减速器:
public class JoinReducer extends Reducer<TextPair, Text, NullWritable, Text> {
public static class KeyPartitioner extends Partitioner<TextPair, Text> {
@Override
public int getPartition(TextPair key, Text value, int numPartitions) {
return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
@Override
protected void reduce(TextPair key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Iterator<Text> iter = values.iterator();
Text stationName = new Text(iter.next());
while (iter.hasNext()) {
Text record = iter.next();
Text outValue = new Text(stationName.toString() + "t" + record.toString());
context.write(NullWritable.get(), outValue);
}
}
}
自定义关键:
public class TextPair implements WritableComparable<TextPair> {
private Text first;
private Text second;
public TextPair() {
set(new Text(), new Text());
}
public TextPair(String first, String second) {
set(new Text(first), new Text(second));
}
public TextPair(Text first, Text second) {
set(first, second);
}
public void set(Text first, Text second) {
this.first = first;
this.second = second;
}
public Text getFirst() {
return first;
}
public Text getSecond() {
return second;
}
@Override
public void write(DataOutput out) throws IOException {
first.write(out);
second.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
first.readFields(in);
second.readFields(in);
}
@Override
public int hashCode() {
return first.hashCode() * 163 + second.hashCode();
}
@Override
public boolean equals(Object o) {
if (o instanceof TextPair) {
TextPair tp = (TextPair) o;
return first.equals(tp.first) && second.equals(tp.second);
}
return false;
}
@Override
public String toString() {
return first + "t" + second;
}
@Override
public int compareTo(TextPair tp) {
int cmp = first.compareTo(tp.first);
if (cmp != 0) {
return cmp;
}
return second.compareTo(tp.second);
}
public static class FirstComparator extends WritableComparator {
private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
public FirstComparator() {
super(TextPair.class);
}
@Override
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
try {
int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
return TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
if (a instanceof TextPair && b instanceof TextPair) {
return ((TextPair) a).first.compareTo(((TextPair) b).first);
}
return super.compare(a, b);
}
}
}
JobDriver:
public class JoinJob extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf(), "Join two DataSet");
job.setJarByClass(getClass());
Path ncdcInputPath = new Path(getConf().get("job.input1.path"));
Path stationInputPath = new Path(getConf().get("job.input2.path"));
Path outputPath = new Path(getConf().get("job.output.path"));
MultipleInputs.addInputPath(job, ncdcInputPath,
TextInputFormat.class, JoinDataSet1Mapper.class);
MultipleInputs.addInputPath(job, stationInputPath,
TextInputFormat.class, JoinDataSet2Mapper.class);
FileOutputFormat.setOutputPath(job, outputPath);
job.setPartitionerClass(JoinReducer.KeyPartitioner.class);
job.setGroupingComparatorClass(TextPair.FirstComparator.class);
job.setMapOutputKeyClass(TextPair.class);
job.setReducerClass(JoinReducer.class);
job.setOutputKeyClass(Text.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new JoinJob(), args);
System.exit(exitCode);
}
}