为什么Hadoop shuffle没有按预期工作



我有这个hadoop映射reduce代码,它适用于图数据(以邻接列表形式),有点类似于从内到外的邻接列表转换算法。MapReduce任务的主要代码如下:

public class TestTask extends Configured
implements Tool {
public static class TTMapper extends MapReduceBase
    implements Mapper<Text, TextArrayWritable, Text, NeighborWritable> {
    @Override
    public void map(Text key, 
            TextArrayWritable value,
            OutputCollector<Text, NeighborWritable> output, 
            Reporter reporter) throws IOException {
        int numNeighbors = value.get().length;
        double weight = (double)1 / numNeighbors;
        Text[] neighbors = (Text[]) value.toArray();
        NeighborWritable me = new NeighborWritable(key, new DoubleWritable(weight));
        for (int i = 0; i < neighbors.length; i++) {
            output.collect(neighbors[i], me);
        }   
    }       
}
public static class TTReducer extends MapReduceBase
    implements Reducer<Text, NeighborWritable, Text, Text> {
    @Override
    public void reduce(Text key, 
                        Iterator<NeighborWritable> values,
                        OutputCollector<Text, Text> output, 
                        Reporter arg3)
            throws IOException {
        ArrayList<NeighborWritable> neighborList = new ArrayList<NeighborWritable>();
        while(values.hasNext()) {
            neighborList.add(values.next());
        }
        NeighborArrayWritable neighbors = new NeighborArrayWritable
                            (neighborList.toArray(new NeighborWritable[0]));
        Text out = new Text(neighbors.toString());
        output.collect(key, out);
    }
}
@Override
public int run(String[] arg0) throws Exception {
    JobConf conf = Util.getMapRedJobConf("testJob",
                                         SequenceFileInputFormat.class, 
                                         TTMapper.class, 
                                         Text.class, 
                                         NeighborWritable.class, 
                                         1, 
                                         TTReducer.class, 
                                         Text.class, 
                                         Text.class, 
                                         TextOutputFormat.class, 
                                         "test/in", 
                                         "test/out");
    JobClient.runJob(conf);
    return 0;
}
public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new TestTask(), args);
    System.exit(res);
}
}

辅助代码如下:TextArrayWritable:

public class TextArrayWritable extends ArrayWritable {
public TextArrayWritable() {
    super(Text.class);
}
public TextArrayWritable(Text[] values) {
    super(Text.class, values);
}
}

邻居可写:

public class NeighborWritable implements Writable {
private Text nodeId;
private DoubleWritable weight;
public NeighborWritable(Text nodeId, DoubleWritable weight) {
    this.nodeId = nodeId;
    this.weight = weight;
}
public NeighborWritable () { }
public Text getNodeId() {
    return nodeId;
}
public DoubleWritable getWeight() {
    return weight;
}
public void setNodeId(Text nodeId) {
    this.nodeId = nodeId;
}
public void setWeight(DoubleWritable weight) {
    this.weight = weight;
}
@Override
public void readFields(DataInput in) throws IOException {
    nodeId = new Text();
    nodeId.readFields(in);
    weight = new DoubleWritable();
    weight.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
    nodeId.write(out);
    weight.write(out);
}
public String toString() {
    return "NW[nodeId=" + (nodeId != null ? nodeId.toString() : "(null)") +
        ",weight=" + (weight != null ? weight.toString() : "(null)") + "]";
}
public boolean equals(Object o) {
    if (!(o instanceof NeighborWritable)) {
        return false;
    }
    NeighborWritable that = (NeighborWritable)o;
    return (nodeId.equals(that.getNodeId()) && (weight.equals(that.getWeight())));
}
}

和Util类:

public class Util {
public static JobConf getMapRedJobConf(String jobName,
                                              Class<? extends InputFormat> inputFormatClass,
                                              Class<? extends Mapper> mapperClass,
                                              Class<?> mapOutputKeyClass,
                                              Class<?> mapOutputValueClass,
                                              int numReducer,
                                              Class<? extends Reducer> reducerClass,
                                              Class<?> outputKeyClass,
                                              Class<?> outputValueClass,
                                              Class<? extends OutputFormat> outputFormatClass,
                                              String inputDir,
                                              String outputDir) throws IOException {
    JobConf conf = new JobConf();
    if (jobName != null)
        conf.setJobName(jobName);
    conf.setInputFormat(inputFormatClass);
    conf.setMapperClass(mapperClass);
    if (numReducer == 0) {
        conf.setNumReduceTasks(0);
        conf.setOutputKeyClass(outputKeyClass);
        conf.setOutputValueClass(outputValueClass);
        conf.setOutputFormat(outputFormatClass);
    } else {
        // may set actual number of reducers
        // conf.setNumReduceTasks(numReducer);
        conf.setMapOutputKeyClass(mapOutputKeyClass);
        conf.setMapOutputValueClass(mapOutputValueClass);
        conf.setReducerClass(reducerClass);
        conf.setOutputKeyClass(outputKeyClass);
        conf.setOutputValueClass(outputValueClass);
        conf.setOutputFormat(outputFormatClass);
    }
    // delete the existing target output folder
    FileSystem fs = FileSystem.get(conf);
    fs.delete(new Path(outputDir), true);

    // specify input and output DIRECTORIES (not files)
    FileInputFormat.addInputPath(conf, new Path(inputDir));
    FileOutputFormat.setOutputPath(conf, new Path(outputDir));
    return conf;        
}
}

我的输入如下图:(二进制格式,这里我给出的是文本格式)

1   2
2   1,3,5
3   2,4
4   3,5
5   2,4

根据代码的逻辑,输出应该是:

1   NWArray[size=1,{NW[nodeId=2,weight=0.3333333333333333],}]
2   NWArray[size=3,{NW[nodeId=5,weight=0.5],NW[nodeId=3,weight=0.5],NW[nodeId=1,weight=1.0],}]
3   NWArray[size=2,{NW[nodeId=2,weight=0.3333333333333333],NW[nodeId=4,weight=0.5],}]
4   NWArray[size=2,{NW[nodeId=5,weight=0.5],NW[nodeId=3,weight=0.5],}]
5   NWArray[size=2,{NW[nodeId=2,weight=0.3333333333333333],NW[nodeId=4,weight=0.5],}]

但结果是:

1   NWArray[size=1,{NW[nodeId=2,weight=0.3333333333333333],}]
2   NWArray[size=3,{NW[nodeId=5,weight=0.5],NW[nodeId=5,weight=0.5],NW[nodeId=5,weight=0.5],}]
3   NWArray[size=2,{NW[nodeId=2,weight=0.3333333333333333],NW[nodeId=2,weight=0.3333333333333333],}]
4   NWArray[size=2,{NW[nodeId=5,weight=0.5],NW[nodeId=5,weight=0.5],}]
5   NWArray[size=2,{NW[nodeId=2,weight=0.3333333333333333],NW[nodeId=2,weight=0.3333333333333333],}]

我不明白预期产出没有出来的原因。任何帮助都将不胜感激。

谢谢。

您违反了对象重用

while(values.hasNext()) {
    neighborList.add(values.next());
}

values.next()将返回相同的对象引用,但该对象的底层内容将在每次迭代中发生变化(调用readFields方法来重新填充内容)

建议您修改为(您需要从设置方法获得配置conf变量,除非您可以从Reporter或OutputCollector获得它-对不起,我没有使用旧的API)

while(values.hasNext()) {
    neighborList.add(
        ReflectionUtils.copy(conf, values.next(), new NeighborWritable());
}

但我仍然不明白为什么我的单元测试当时通过了。这是代码-

public class UWLTInitReducerTest {
private Text key;
private Iterator<NeighborWritable> values;
private NeighborArrayWritable nodeData;
private TTReducer reducer;
/**
 * Set up the states for calling the map function
 */
@Before
public void setUp() throws Exception {
    key = new Text("1001");
    NeighborWritable[] neighbors = new NeighborWritable[4];
    for (int i = 0; i < 4; i++) {
        neighbors[i] = new NeighborWritable(new Text("300" + i), new DoubleWritable((double) 1 / (1 + i)));
    }
    values = Arrays.asList(neighbors).iterator();
    nodeData = new NeighborArrayWritable(neighbors);
    reducer = new TTReducer();
}
/**
 * Test method for InitModelMapper#map - valid input
 */
@Test
public void testMapValid() {
    // mock the output object
    OutputCollector<Text, UWLTNodeData> output = mock(OutputCollector.class);
    try {
        // call the API
        reducer.reduce(key, values, output, null);
        // in order (sequential) verification of the calls to output.collect()
        verify(output).collect(key, nodeData);
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}
}

为什么这段代码没有发现错误?

相关内容

  • 没有找到相关文章

最新更新