我想用Java写一个Hadoop MapReduce Join



我是Hadoop框架的新手,我想写一个' mapreduce ';在两个表R和s之间连接x属性的程序(HadoopJoin.java)。这两个表的结构是:

R (tag : char, x : int, y : varchar(30))
and
S (tag : char, x : int, z : varchar(30))

例如R table:

r 10 r-10-0
r 11 r-11-0
r 12 r-12-0
r 21 r-21-0 

对于S表:

s 11 s-11-0
s 21 s-41-0
s 21 s-41-1
s 12 s-31-0
s 11 s-31-1

结果应该如下所示:

r 11 r-11-0 s 11 s-11-0
etc.

有谁能帮帮我吗?

对于不熟悉这个框架的人来说,在mapreduce中描述join是非常困难的,但是在这里我提供了一个适合您情况的工作实现,我强烈建议您阅读Hadoop The Definitive Guide 4th edition的第9节。它很好地描述了如何在mapreduce中实现Join。

首先,你可以考虑使用更高层次的框架,如Pig, Hive和Spark,因为它们在实现的核心部分提供了连接操作。

其次,根据数据的性质,有许多实现mapreduce的方法。这种方式包括映射端连接和简化端连接。在这个答案中,我已经实现了减侧连接:

实现:

首先,我们应该为两个不同的数据集使用两个不同的映射器请注意,在您的情况下,同一个映射器可以用于两个数据集但在许多情况下,您需要为不同的数据集使用不同的映射器因此,我定义了两个映射器以使此解决方案更通用:

我使用了有两个属性的TextPair,其中一个是用于连接数据的键,另一个是指定此记录属于哪个数据集的标记。如果它属于第一个数据集,这个标记将为0。否则为1。

我已经实现了TextPair.FirstComparator,以确保对于每个键(按键连接),第一个数据集的记录是reducer接收的第一个键。第二个数据集中所有其他带有该id的记录在此之后被接收。实际上,这行代码将为我们完成任务:

job.setGroupingComparatorClass(TextPair.FirstComparator.class);

所以在reducer中,我们将接收到的第一个记录是来自dataset1的记录,之后我们接收到来自dataset2的记录。我们唯一要做的就是把这些记录写下来。

Mapper for dataset:

public class JoinDataSet1Mapper
extends Mapper<LongWritable, Text, TextPair, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] data = value.toString().split(" ");
context.write(new TextPair(data[1], "0"), value);
}
}

Mapper for DataSet2:

public class JoinDataSet2Mapper
extends Mapper<LongWritable, Text, TextPair, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] data = value.toString().split(" ");
context.write(new TextPair(data[1], "1"), value);
}
}

减速器:

public class JoinReducer extends Reducer<TextPair, Text, NullWritable, Text> {
public static class KeyPartitioner extends Partitioner<TextPair, Text> {
@Override
public int getPartition(TextPair key, Text value, int numPartitions) {
return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
@Override
protected void reduce(TextPair key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Iterator<Text> iter = values.iterator();
Text stationName = new Text(iter.next());
while (iter.hasNext()) {
Text record = iter.next();
Text outValue = new Text(stationName.toString() + "t" + record.toString());
context.write(NullWritable.get(), outValue);
}
}
}

自定义关键:

public class TextPair implements WritableComparable<TextPair> {
private Text first;
private Text second;
public TextPair() {
set(new Text(), new Text());
}
public TextPair(String first, String second) {
set(new Text(first), new Text(second));
}
public TextPair(Text first, Text second) {
set(first, second);
}
public void set(Text first, Text second) {
this.first = first;
this.second = second;
}
public Text getFirst() {
return first;
}
public Text getSecond() {
return second;
}
@Override
public void write(DataOutput out) throws IOException {
first.write(out);
second.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
first.readFields(in);
second.readFields(in);
}
@Override
public int hashCode() {
return first.hashCode() * 163 + second.hashCode();
}
@Override
public boolean equals(Object o) {
if (o instanceof TextPair) {
TextPair tp = (TextPair) o;
return first.equals(tp.first) && second.equals(tp.second);
}
return false;
}
@Override
public String toString() {
return first + "t" + second;
}
@Override
public int compareTo(TextPair tp) {
int cmp = first.compareTo(tp.first);
if (cmp != 0) {
return cmp;
}
return second.compareTo(tp.second);
}
public static class FirstComparator extends WritableComparator {
private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
public FirstComparator() {
super(TextPair.class);
}
@Override
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
try {
int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
return TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
if (a instanceof TextPair && b instanceof TextPair) {
return ((TextPair) a).first.compareTo(((TextPair) b).first);
}
return super.compare(a, b);
}
}
}

JobDriver:

public class JoinJob extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf(), "Join two DataSet");
job.setJarByClass(getClass());
Path ncdcInputPath = new Path(getConf().get("job.input1.path"));
Path stationInputPath = new Path(getConf().get("job.input2.path"));
Path outputPath = new Path(getConf().get("job.output.path"));
MultipleInputs.addInputPath(job, ncdcInputPath,
TextInputFormat.class, JoinDataSet1Mapper.class);
MultipleInputs.addInputPath(job, stationInputPath,
TextInputFormat.class, JoinDataSet2Mapper.class);
FileOutputFormat.setOutputPath(job, outputPath);
job.setPartitionerClass(JoinReducer.KeyPartitioner.class);
job.setGroupingComparatorClass(TextPair.FirstComparator.class);
job.setMapOutputKeyClass(TextPair.class);
job.setReducerClass(JoinReducer.class);
job.setOutputKeyClass(Text.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new JoinJob(), args);
System.exit(exitCode);
}
}

相关内容