Hadoop 不应该只根据哈希代码在 reducer 中分组 <key,(值列表)吗?



我决定创建自己的WritableComparable类来学习Hadoop是如何使用它的。因此,我创建了一个带有两个实例变量的Order类(orderNumber客户机),并实现了所需的方法。我还使用Eclipse生成器生成getter/setter/hashCode/equals/toString。

在compareTo中,我决定只使用orderNumber变量。

我创建了一个简单的MapReduce作业,仅用于统计数据集中某个顺序的出现次数。由于错误,我的一个测试记录是Ita而不是it,正如您在这里看到的:

123 Ita
123 Itá
123 Itá
345 Carol
345 Carol
345 Carol
345 Carol
456 Iza Smith

据我所知,第一条记录应被视为不同的顺序,因为记录1 hashCode与记录2和3 hashCode不同。

但在reduce阶段,3条记录被分组在一起。如你所见:

Order [cliente=Ita, orderNumber=123]    3
Order [cliente=Carol, orderNumber=345]  4
Order [cliente=Iza Smith, orderNumber=456]  1

我认为它应该有一行用来存放计数为2的记录,而它应该有计数为1的记录。

因为我在compareTo中只使用了orderNumber,所以我尝试在此方法中使用String客户端(在下面的代码中注释)。然后,它就像我期望的那样工作了。

那么,这是预期的结果吗?hadoop不应该只使用hashCode来分组键及其值吗?

下面是Order类(我提交了getter/setter):

public class Order implements WritableComparable<Order>
{
private String cliente;
private long orderNumber;

@Override
public void readFields(DataInput in) throws IOException 
{
    cliente = in.readUTF();
    orderNumber = in.readLong();
}

@Override
public void write(DataOutput out) throws IOException 
{
    out.writeUTF(cliente);
    out.writeLong(orderNumber);
}
@Override
public int compareTo(Order o) {
    long thisValue = this.orderNumber;
    long thatValue = o.orderNumber;
    return (thisValue < thatValue ? -1 :(thisValue == thatValue ? 0 :1));
    //return this.cliente.compareTo(o.cliente);
}
@Override
public int hashCode() {
    final int prime = 31;
    int result = 1;
    result = prime * result + ((cliente == null) ? 0 : cliente.hashCode());
    result = prime * result + (int) (orderNumber ^ (orderNumber >>> 32));
    return result;
}

@Override
public boolean equals(Object obj) {
    if (this == obj)
        return true;
    if (obj == null)
        return false;
    if (getClass() != obj.getClass())
        return false;
    Order other = (Order) obj;
    if (cliente == null) {
        if (other.cliente != null)
            return false;
    } else if (!cliente.equals(other.cliente))
        return false;
    if (orderNumber != other.orderNumber)
        return false;
    return true;
}

@Override
public String toString() {
    return "Order [cliente=" + cliente + ", orderNumber=" + orderNumber + "]";
}

下面是MapReduce的代码:

public class TesteCustomClass extends Configured implements Tool
{
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Order, LongWritable>
{
    LongWritable outputValue = new LongWritable();
    String[] campos;
    Order order = new Order();
        @Override
    public void configure(JobConf job)
    {
    }
    @Override
    public void map(LongWritable key, Text value, OutputCollector<Order, LongWritable> output, Reporter reporter) throws IOException 
            {
        campos = value.toString().split("t");
            order.setOrderNumber(Long.parseLong(campos[0]));
        order.setCliente(campos[1]);
        outputValue.set(1L);
        output.collect(order, outputValue);
    }
}
public static class Reduce extends MapReduceBase implements Reducer<Order, LongWritable, Order,LongWritable>
{
    @Override
    public void reduce(Order key, Iterator<LongWritable> values,OutputCollector<Order,LongWritable> output, Reporter reporter) throws IOException 
    {
        LongWritable value = new LongWritable(0);
        while (values.hasNext())
        {
            value.set(value.get() + values.next().get());
        }
        output.collect(key, value);
    }
}
@Override
public int run(String[] args) throws Exception {
    JobConf conf = new JobConf(getConf(),TesteCustomClass.class);
    conf.setMapperClass(Map.class);
    //  conf.setCombinerClass(Reduce.class);
    conf.setReducerClass(Reduce.class);
    conf.setJobName("Teste - Custom Classes");
    conf.setOutputKeyClass(Order.class);
    conf.setOutputValueClass(LongWritable.class);
    conf.setInputFormat(TextInputFormat.class);
    conf.setOutputFormat(TextOutputFormat.class);
    FileInputFormat.setInputPaths(conf, new Path(args[0]));
    FileOutputFormat.setOutputPath(conf, new Path(args[1]));

    JobClient.runJob(conf);
    return 0;
}
public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new Configuration(),new TesteCustomClass(),args);
    System.exit(res);
}
}

默认分区器是HashPartitioner,它使用hashCode方法来确定将K,V对发送到哪个reducer。

一旦在reducer中(或者如果你使用的是在map端运行的Combiner), compareTo方法用于对键进行排序,然后也使用(默认情况下)来比较顺序键是否应该分组在一起,以及它们的关联值是否应该在同一迭代中减少。

如果你不使用cliente键变量,只有你的orderNumber变量在你的compareTo方法,那么任何键与相同的orderNumber将有其值减少在一起-不管cliente值(这是你目前正在观察的)

相关内容

  • 没有找到相关文章

最新更新