确定所有住宅单元的最大小时用电量所有的日期。你的处理应该产生一个单一的值。
我有这个样本数据
LOG_ID HOUSE_ID CONDATE CONHOUR ENERGY_READING FLAG
3682572 16 2019-01-01 05:21:50 11143.735496 0
3682573 16 2019-01-01 05:22:00 11143.738274 0
3682574 16 2019-01-01 05:22:10 11143.741052 0
3682575 16 2019-01-01 05:22:20 11143.74383 0
3682576 16 2019-01-01 05:22:30 11143.746608 0
3682577 16 2019-01-01 05:22:40 11143.749386 0
3682578 16 2019-01-01 05:22:50 11143.752164 0
3682579 16 2019-01-01 05:23:00 11143.754942 0
3682580 16 2019-01-01 05:23:10 11143.75772 0
3682581 16 2019-01-01 05:23:20 11143.760498 0
3682582 16 2019-01-01 05:23:30 11143.763276 0
3682583 16 2019-01-01 05:23:40 11143.766054 0
3682584 16 2019-01-01 05:23:50 11143.768832 0
是样例数据。能源消耗是累积的。所以这里我想找到每天的最大小时消耗。
我尝试:Mapper类
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class EnergyMapper
extends Mapper<LongWritable, Text, Text, FloatWritable> {
@Override
protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
String[] values = value.toString().split("\s+");
String date;
int houseId;
String time;
double energyReading;
EnergyValues ev;
try {
date = values[2];
houseId = Integer.parseInt(values[1]);
time = values[3];
energyReading = Double.parseDouble(values[4]);
ev = new EnergyValues(houseId,time,energyReading,date);
}
catch (Exception e){
date = "NA";
ev = new EnergyValues();
}
context.write(new Text(date), ev);
}
}
减速机类
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.io.IOException;
public class EnergyReducer extends Reducer<Text, EnergyValues, Text, EnergyValues> {
@Override
protected void reduce(Text key, Iterable<EnergyValues> sales, Context context)
throws IOException, InterruptedException {
String Data = "";
// int i = 0;
double Consumption = 0.0D;
double minConsumption = 0.0D;
double MaxConsumption = 0.0D;
for (EnergyValues amount: sales) {
//maxConsumption =
try {
minConsumption = getHourlyComp(key,sales,amount.getTime());
} catch (ParseException e) {
throw new RuntimeException(e);
}
Consumption = minConsumption - amount.getEnergyReading() ;
System.out.println("outside function +++++++++++++++++++++++++");
System.out.println(amount.getEnergyReading() );
MaxConsumption = Math.max(MaxConsumption, Consumption);
Data = amount.getHouseId()+","+ ","+ amount.getTime() + "," + amount.getEnergyReading();
}
context.write(new Text(String.valueOf(MaxConsumption)), new EnergyValues(Data));
}
public double getHourlyComp(Text key, Iterable<EnergyValues> sales, String time)throws ParseException{
float maxConsumption = 0.0F;
for (EnergyValues amount: sales) {
// String hours = time.substring(0, 2);
// if (hours != "23"){
if (key.toString() == amount.getDate()) {
String myTime = time;
SimpleDateFormat df = new SimpleDateFormat("HH:MM:SS");
Date d = df.parse(myTime);
Calendar cal = Calendar.getInstance();
cal.setTime(d);
cal.add(Calendar.HOUR, 1);
String newTime = df.format(cal.getTime());
if (newTime == amount.getTime()) {
System.out.println("Inside function ----------------------");
System.out.println(amount.getEnergyReading() );
return amount.getEnergyReading();
}
}
// }
}
return 0;
}
}
能量值等级
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class EnergyValues implements Writable {
int houseId;
String time;
double energyReading;
String date;
String data;
public EnergyValues(){
this.houseId = 0;
this.time = "";
this.energyReading =0.0;
this.date = "";
}
public EnergyValues( String value2) {
this.data = value2;
}
public EnergyValues(int val1, String val2, double val3,String val4) {
this.houseId = val1;
this.time = val2;
this.energyReading = val3;
this.date =val4;
}
public int getHouseId() {
return houseId;
}
public String getTime() {return time; }
public double getEnergyReading(){return energyReading;}
public String getDate(){return date;}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(houseId);
dataOutput.writeChars(time);
dataOutput.writeDouble(energyReading);
dataOutput.writeChars(date);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
houseId = dataInput.readInt();
time = String.valueOf(dataInput.readChar());
energyReading = dataInput.readDouble();
date = String.valueOf(dataInput.readChar());
}
}
但是当我运行这个,我得到这个结果
0.0 EnergyValues@f79a760
0.0 EnergyValues@14f5da2c
0.0 EnergyValues@239b0f9d
0.0 EnergyValues@619bfe29
0.0 EnergyValues@1eb6749b
0.0 EnergyValues@652a7737
0.0 EnergyValues@2bef51f2
0.0 EnergyValues@650eab8
在Energy Value class类中你需要重写该类的to string方法,在你的reducer中传递to_string,你这样做的方式是传递对象的值而不是字符串