如何在MapReduce中使用多个字段



我想了解如何使用MapReduce模型聚合多个字段。

例如,如果我有一个这样的数据文件:

id, site, name, qty, price
00, testA, NameA,1,1
01, testB,NameA,2,3
02, testB,NameB,5,7

,并希望在MapReduce上实现此聚合:

select site,name, (qty*price) as total
from PO where name ='NameA' 
group by site,name,total 
order by site;

我该怎么做呢?

我能够按站点(键),总数(值)聚合,但不确定如何包括名称列。

我需要了解如何在MapReduce中使用多个字段。能给我举个例子吗?还是需要使用Hbase?

您可以实现WritableComparable并使用几个字段创建自己的CompositeKey,例如:

public static class CompositeKey implements WritableComparable<CompositeKey> {
    public final Text site;
    public final Text name;
    public final LongWritable total;
    public CompositeKey(Text site, Text name, LongWritable total) {
        this.site = site;
        this.name = name;
        this.total = total;
    }
    @Override
    public int compareTo(CompositeKey o) {
        int siteCmp = site.compareTo(o.site);
        if (siteCmp != 0) {
            return siteCmp;
        }
        int nameCmp = name.compareTo(o.name);
        if (nameCmp != 0) {
            return nameCmp;
        }
        return total.compareTo(o.total);
    }
    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        CompositeKey that = (CompositeKey) o;
        if (name != null ? !name.equals(that.name) : that.name != null) return false;
        if (site != null ? !site.equals(that.site) : that.site != null) return false;
        if (total != null ? !total.equals(that.total) : that.total != null) return false;
        return true;
    }
    @Override
    public int hashCode() {
        int result = site != null ? site.hashCode() : 0;
        result = 31 * result + (name != null ? name.hashCode() : 0);
        result = 31 * result + (total != null ? total.hashCode() : 0);
        return result;
    }
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        site.write(dataOutput);
        name.write(dataOutput);
        total.write(dataOutput);
    }
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        site.readFields(dataInput);
        name.readFields(dataInput);
        total.readFields(dataInput);
    }
}

相关内容

  • 没有找到相关文章

最新更新