Map Reduce对于复杂的键和值没有减少到预期的那么多



无论我对复杂键进行多么简单的比较,都不会得到预期的结果。除了如果我对每个记录使用一个相同的键之外,它将适当地减少到一个记录。我还目睹了只有当我处理满负荷时才会发生这种情况,如果我中断一些没有减少的记录,并以小得多的规模运行,这些记录就会合并在一起。

输出记录的总和是正确的,但在记录级别上存在重复,我本希望将这些项目组合在一起。因此,我预计500条记录加起来就是5000条,我最终得到了1232条记录加上5000条,显然这些记录本应减少为一条。

我读过关于对象引用、复杂键和值的问题,但我看不出还有什么潜力。为此,你会发现我正在创建新对象的地方,我可能不需要这些地方,但我现在正在尝试所有东西,一旦它工作起来,就会拨回。

我不知道该尝试什么,在哪里以及如何戳出来。请帮忙!

public static class Map extends
        Mapper<LongWritable, Text, IMSTranOut, IMSTranSums> {
    //private SimpleDateFormat dtFormat = new SimpleDateFormat("yyyyddd");
    @Override
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        SimpleDateFormat dtFormat = new SimpleDateFormat("yyyyddd");
        IMSTranOut dbKey = new IMSTranOut();
        IMSTranSums sumVals = new IMSTranSums();
        String[] tokens = line.split(",", -1);
        dbKey.setLoadKey(-99);
        dbKey.setTranClassKey(-99);
        dbKey.setTransactionCode(tokens[0]);
        dbKey.setTransactionType(tokens[1]);
        dbKey.setNpaNxx(getNPA(dbKey.getTransactionCode()));
        try {
            dbKey.setTranDate(new Date(dtFormat.parse(tokens[2]).getTime()));
        } catch (ParseException e) {
        }// 2
        dbKey.setTranHour(getTranHour(tokens[3]));
        try {
            dbKey.setStartDate(new Date(dtFormat.parse(tokens[4]).getTime()));
        } catch (ParseException e) {
        }// 4
        dbKey.setStartHour(getTranHour(tokens[5]));
        try {
            dbKey.setStopDate(new Date(dtFormat.parse(tokens[6]).getTime()));
        } catch (ParseException e) {
        }// 6
        dbKey.setStopHour(getTranHour(tokens[7]));
        sumVals.setTranCount(1);
        sumVals.setInputQTime(Double.parseDouble(tokens[8]));
        sumVals.setElapsedTime(Double.parseDouble(tokens[9]));
        sumVals.setCpuTime(Double.parseDouble(tokens[10]));
        context.write(dbKey, sumVals);
    }
}
public static class Reduce extends
        Reducer<IMSTranOut, IMSTranSums, IMSTranOut, IMSTranSums> {
    @Override
    public void reduce(IMSTranOut key, Iterable<IMSTranSums> values,
            Context context) throws IOException, InterruptedException {
        int tranCount = 0;
        double inputQ = 0;
        double elapsed = 0;
        double cpu = 0;
        for (IMSTranSums val : values) {
            tranCount += val.getTranCount();
            inputQ += val.getInputQTime();
            elapsed += val.getElapsedTime();
            cpu += val.getCpuTime();
        }
        IMSTranSums sumVals=new IMSTranSums();
        IMSTranOut dbKey=new IMSTranOut();
        sumVals.setCpuTime(inputQ);
        sumVals.setElapsedTime(elapsed);
        sumVals.setInputQTime(cpu);
        sumVals.setTranCount(tranCount);
        dbKey.setLoadKey(key.getLoadKey());
        dbKey.setTranClassKey(key.getTranClassKey());
        dbKey.setNpaNxx(key.getNpaNxx());
        dbKey.setTransactionCode(key.getTransactionCode());
        dbKey.setTransactionType(key.getTransactionType());
        dbKey.setTranDate(key.getTranDate());
        dbKey.setTranHour(key.getTranHour());
        dbKey.setStartDate(key.getStartDate());
        dbKey.setStartHour(key.getStartHour());
        dbKey.setStopDate(key.getStopDate());
        dbKey.setStopHour(key.getStopHour());
        dbKey.setInputQTime(inputQ);
        dbKey.setElapsedTime(elapsed);
        dbKey.setCpuTime(cpu);
        dbKey.setTranCount(tranCount);
        context.write(dbKey, sumVals);
    }
}

以下是DBWritable类的实现:

public class IMSTranOut implements DBWritable,
    WritableComparable<IMSTranOut> {
private int loadKey;
private int tranClassKey;
private String npaNxx;
private String transactionCode;
private String transactionType;
private Date tranDate;
private double tranHour;
private Date startDate;
private double startHour;
private Date stopDate;
private double stopHour;
private double inputQTime;
private double elapsedTime;
private double cpuTime;
private int tranCount;
public void readFields(ResultSet rs) throws SQLException {
    setLoadKey(rs.getInt("LOAD_KEY"));
    setTranClassKey(rs.getInt("TRAN_CLASS_KEY"));
    setNpaNxx(rs.getString("NPA_NXX"));
    setTransactionCode(rs.getString("TRANSACTION_CODE"));
    setTransactionType(rs.getString("TRANSACTION_TYPE"));
    setTranDate(rs.getDate("TRAN_DATE"));
    setTranHour(rs.getInt("TRAN_HOUR"));
    setStartDate(rs.getDate("START_DATE"));
    setStartHour(rs.getInt("START_HOUR"));
    setStopDate(rs.getDate("STOP_DATE"));
    setStopHour(rs.getInt("STOP_HOUR"));
    setInputQTime(rs.getInt("INPUT_Q_TIME"));
    setElapsedTime(rs.getInt("ELAPSED_TIME"));
    setCpuTime(rs.getInt("CPU_TIME"));
    setTranCount(rs.getInt("TRAN_COUNT"));
}
public void write(PreparedStatement ps) throws SQLException {
    ps.setInt(1, loadKey);
    ps.setInt(2, tranClassKey);
    ps.setString(3, npaNxx);
    ps.setString(4, transactionCode);
    ps.setString(5, transactionType);
    ps.setDate(6, tranDate);
    ps.setDouble(7, tranHour);
    ps.setDate(8, startDate);
    ps.setDouble(9, startHour);
    ps.setDate(10, stopDate);
    ps.setDouble(11, stopHour);
    ps.setDouble(12, inputQTime);
    ps.setDouble(13, elapsedTime);
    ps.setDouble(14, cpuTime);
    ps.setInt(15, tranCount);
}
public int getLoadKey() {
    return loadKey;
}
public void setLoadKey(int loadKey) {
    this.loadKey = loadKey;
}
public int getTranClassKey() {
    return tranClassKey;
}
public void setTranClassKey(int tranClassKey) {
    this.tranClassKey = tranClassKey;
}
public String getNpaNxx() {
    return npaNxx;
}
public void setNpaNxx(String npaNxx) {
    this.npaNxx = new String(npaNxx);
}
public String getTransactionCode() {
    return transactionCode;
}
public void setTransactionCode(String transactionCode) {
    this.transactionCode = new String(transactionCode);
}
public String getTransactionType() {
    return transactionType;
}
public void setTransactionType(String transactionType) {
    this.transactionType = new String(transactionType);
}
public Date getTranDate() {
    return tranDate;
}
public void setTranDate(Date tranDate) {
    this.tranDate = new Date(tranDate.getTime());
}
public double getTranHour() {
    return tranHour;
}
public void setTranHour(double tranHour) {
    this.tranHour = tranHour;
}
public Date getStartDate() {
    return startDate;
}
public void setStartDate(Date startDate) {
    this.startDate = new Date(startDate.getTime());
}
public double getStartHour() {
    return startHour;
}
public void setStartHour(double startHour) {
    this.startHour = startHour;
}
public Date getStopDate() {
    return stopDate;
}
public void setStopDate(Date stopDate) {
    this.stopDate = new Date(stopDate.getTime());
}
public double getStopHour() {
    return stopHour;
}
public void setStopHour(double stopHour) {
    this.stopHour = stopHour;
}
public double getInputQTime() {
    return inputQTime;
}
public void setInputQTime(double inputQTime) {
    this.inputQTime = inputQTime;
}
public double getElapsedTime() {
    return elapsedTime;
}
public void setElapsedTime(double elapsedTime) {
    this.elapsedTime = elapsedTime;
}
public double getCpuTime() {
    return cpuTime;
}
public void setCpuTime(double cpuTime) {
    this.cpuTime = cpuTime;
}
public int getTranCount() {
    return tranCount;
}
public void setTranCount(int tranCount) {
    this.tranCount = tranCount;
}
public void readFields(DataInput input) throws IOException {
    setNpaNxx(input.readUTF());
    setTransactionCode(input.readUTF());
    setTransactionType(input.readUTF());
    setTranDate(new Date(input.readLong()));
    setStartDate(new Date(input.readLong()));
    setStopDate(new Date(input.readLong()));
    setLoadKey(input.readInt());
    setTranClassKey(input.readInt());
    setTranHour(input.readDouble());
    setStartHour(input.readDouble());
    setStopHour(input.readDouble());
    setInputQTime(input.readDouble());
    setElapsedTime(input.readDouble());
    setCpuTime(input.readDouble());
    setTranCount(input.readInt());
}
public void write(DataOutput output) throws IOException {
    output.writeUTF(npaNxx);
    output.writeUTF(transactionCode);
    output.writeUTF(transactionType);
    output.writeLong(tranDate.getTime());
    output.writeLong(startDate.getTime());
    output.writeLong(stopDate.getTime());
    output.writeInt(loadKey);
    output.writeInt(tranClassKey);
    output.writeDouble(tranHour);
    output.writeDouble(startHour);
    output.writeDouble(stopHour);
    output.writeDouble(inputQTime);
    output.writeDouble(elapsedTime);
    output.writeDouble(cpuTime);
    output.writeInt(tranCount);
}
public int compareTo(IMSTranOut o) {
    return (Integer.compare(loadKey, o.getLoadKey()) == 0
            && Integer.compare(tranClassKey, o.getTranClassKey()) == 0
            && npaNxx.compareTo(o.getNpaNxx()) == 0
            && transactionCode.compareTo(o.getTransactionCode()) == 0
            && (transactionType.compareTo(o.getTransactionType()) == 0)
            && tranDate.compareTo(o.getTranDate()) == 0
            && Double.compare(tranHour, o.getTranHour()) == 0
            && startDate.compareTo(o.getStartDate()) == 0
            && Double.compare(startHour, o.getStartHour()) == 0
            && stopDate.compareTo(o.getStopDate()) == 0 
            && Double.compare(stopHour, o.getStopHour()) == 0) ? 0 : 1;
}
}

复杂值的可写类的实现:

public class IMSTranSums
 implements Writable{

        private double inputQTime;
        private double elapsedTime;
        private double cpuTime;
        private int tranCount;
        public double getInputQTime() {
            return inputQTime;
        }
        public void setInputQTime(double inputQTime) {
            this.inputQTime = inputQTime;
        }
        public double getElapsedTime() {
            return elapsedTime;
        }
        public void setElapsedTime(double elapsedTime) {
            this.elapsedTime = elapsedTime;
        }
        public double getCpuTime() {
            return cpuTime;
        }
        public void setCpuTime(double cpuTime) {
            this.cpuTime = cpuTime;
        }
        public int getTranCount() {
            return tranCount;
        }
        public void setTranCount(int tranCount) {
            this.tranCount = tranCount;
        }
        public void write(DataOutput output) throws IOException {
            output.writeDouble(inputQTime);
            output.writeDouble(elapsedTime);
            output.writeDouble(cpuTime);
            output.writeInt(tranCount);
        }
        public void readFields(DataInput input) throws IOException {
            inputQTime=input.readDouble();
            elapsedTime=input.readDouble();
            cpuTime=input.readDouble();
            tranCount=input.readInt();
        }
}

您的compareTo有缺陷,它将完全使排序算法失败,因为您的排序似乎破坏了传递性。

我建议您使用Apache Commons的CompareToBuilder或Guava的ComparisonChain,以使您的比较更加可读(正确!)。

相关内容

  • 没有找到相关文章

最新更新