Hadoop map reduce用于查找基于累积能量读数的最大小时消耗



确定所有住宅单元的最大小时用电量所有的日期。你的处理应该产生一个单一的值。

我有这个样本数据

LOG_ID     HOUSE_ID CONDATE     CONHOUR ENERGY_READING  FLAG
3682572 16  2019-01-01   05:21:50   11143.735496    0
3682573 16  2019-01-01  05:22:00    11143.738274    0
3682574 16  2019-01-01  05:22:10    11143.741052    0
3682575 16  2019-01-01  05:22:20    11143.74383 0
3682576 16  2019-01-01  05:22:30    11143.746608    0
3682577 16  2019-01-01  05:22:40    11143.749386    0
3682578 16  2019-01-01  05:22:50    11143.752164    0
3682579 16  2019-01-01  05:23:00    11143.754942    0
3682580 16  2019-01-01  05:23:10    11143.75772 0
3682581 16  2019-01-01  05:23:20    11143.760498    0
3682582 16  2019-01-01  05:23:30    11143.763276    0
3682583 16  2019-01-01  05:23:40    11143.766054    0
3682584 16  2019-01-01  05:23:50    11143.768832    0

是样例数据。能源消耗是累积的。所以这里我想找到每天的最大小时消耗。

我尝试:Mapper类

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class EnergyMapper
        extends Mapper<LongWritable, Text, Text, FloatWritable> {
    @Override
    protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
        String[] values = value.toString().split("\s+");
        String date;
        int houseId;
        String time;
        double energyReading;
        EnergyValues ev;
        try {
            date = values[2];
            houseId = Integer.parseInt(values[1]);
            time = values[3];
            energyReading = Double.parseDouble(values[4]);
            ev = new EnergyValues(houseId,time,energyReading,date);
        }
        catch (Exception e){
            date = "NA";
            ev = new EnergyValues();
        }
        context.write(new Text(date), ev);
    }
}

减速机类

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.io.IOException;

public class EnergyReducer extends Reducer<Text, EnergyValues, Text, EnergyValues> {
    @Override
    protected void reduce(Text key, Iterable<EnergyValues> sales, Context context)
            throws IOException, InterruptedException {
        String Data = "";
     //   int i = 0;
        double  Consumption = 0.0D;
        double  minConsumption = 0.0D;
        double  MaxConsumption = 0.0D;
        for (EnergyValues amount: sales) {
            //maxConsumption =
            try {
                minConsumption = getHourlyComp(key,sales,amount.getTime());
            } catch (ParseException e) {
                throw new RuntimeException(e);
            }
            Consumption = minConsumption - amount.getEnergyReading() ;
            System.out.println("outside function +++++++++++++++++++++++++");
            System.out.println(amount.getEnergyReading() );

            MaxConsumption = Math.max(MaxConsumption, Consumption);
            Data =  amount.getHouseId()+","+ ","+ amount.getTime() + "," + amount.getEnergyReading();
        }

        context.write(new Text(String.valueOf(MaxConsumption)), new EnergyValues(Data));
    }
    public double getHourlyComp(Text key, Iterable<EnergyValues> sales, String time)throws ParseException{
        float maxConsumption = 0.0F;
        for (EnergyValues amount: sales) {
               // String hours = time.substring(0, 2);
               // if (hours != "23"){
                    if (key.toString() == amount.getDate()) {
                        String myTime = time;
                        SimpleDateFormat df = new SimpleDateFormat("HH:MM:SS");
                        Date d = df.parse(myTime);
                        Calendar cal = Calendar.getInstance();
                        cal.setTime(d);
                        cal.add(Calendar.HOUR, 1);
                        String newTime = df.format(cal.getTime());
                        if (newTime == amount.getTime()) {
                            System.out.println("Inside function ----------------------");
                            System.out.println(amount.getEnergyReading() );
                            return amount.getEnergyReading();
                        }
                    }
          //  }
        }
        return 0;
    }
}

能量值等级

import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class EnergyValues implements Writable {
    int houseId;
    String time;
    double energyReading;
    String date;
    String data;
    public EnergyValues(){
        this.houseId = 0;
        this.time = "";
        this.energyReading =0.0;
        this.date = "";
    }
    public EnergyValues( String value2) {
        this.data = value2;
        }
    public EnergyValues(int val1, String val2, double val3,String val4) {
        this.houseId = val1;
        this.time = val2;
        this.energyReading = val3;
        this.date =val4;
    }
    public int getHouseId() {
        return houseId;
    }
    public String getTime() {return time; }
    public double getEnergyReading(){return energyReading;}
    public String getDate(){return date;}
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeInt(houseId);
        dataOutput.writeChars(time);
        dataOutput.writeDouble(energyReading);
        dataOutput.writeChars(date);
    }
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        houseId = dataInput.readInt();
        time = String.valueOf(dataInput.readChar());
        energyReading = dataInput.readDouble();
        date = String.valueOf(dataInput.readChar());
    }

}

但是当我运行这个,我得到这个结果


0.0     EnergyValues@f79a760
0.0     EnergyValues@14f5da2c
0.0     EnergyValues@239b0f9d
0.0     EnergyValues@619bfe29
0.0     EnergyValues@1eb6749b
0.0     EnergyValues@652a7737
0.0     EnergyValues@2bef51f2
0.0     EnergyValues@650eab8

在Energy Value class类中你需要重写该类的to string方法,在你的reducer中传递to_string,你这样做的方式是传递对象的值而不是字符串

相关内容

  • 没有找到相关文章

最新更新