我最近开始学习map reduce编程。因此,出于这些目的,我从一个场景开始。在那里我有样本数据,如账号,余额&交易日期。所以我想要最近的交易发生的帐号。
这是我的输入:
+-------+-------+------------+
| accno | bal | date |
+-------+-------+------------+
| 13611 | 3360 | 2015-09-18 |
| 13611 | 1500 | 2015-09-19 |
| 13620 | 10000 | 2015-09-17 |
| 13620 | 6000 | 2015-09-18 |
| 13620 | 3000 | 2015-09-19 |
| 13631 | 5000 | 2015-09-16 |
| 13631 | 3500 | 2015-09-18 |
| 13621 | 3000 | 2015-09-10 |
| 13621 | 1800 | 2015-09-15 |
+-------+-------+------------+
预期输出-->
+-------+-------+------------+
| accno | bal | Date |
+-------+-------+------------+
| 13611 | 1500 | 2015-09-19 |
| 13620 | 3000 | 2015-09-19 |
| 13631 | 3500 | 2015-09-18 |
| 13621 | 1800 | 2015-09-15 |
+-------+-------+------------+
我正在尝试开发代码,但我被如何获取特定密钥的最新日期所困扰。作为过程
1( 我读取输入并发出acc_no作为键&作为值的文本行。
2( 然后我在密钥(即acc_no(上划分数据
3( 在reduce阶段,实现逻辑以在最新日期之前获取记录。
//驱动程序代码
public class EmployeeDriver extends Configured implements Tool{
@Override
public int run(String[] arg0) throws Exception {
Configuration conf = getConf();
Job job = Job.getInstance(conf);
job.setJarByClass(getClass());
// job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapperClass(EmployeeMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setPartitionerClass(EmployessPartitioner.class);
job.setReducerClass(EmployeeReducer.class);
job.setNumReduceTasks(4);
FileInputFormat.addInputPath(job, new Path("D:\datatoload\HotelCloutMap\Emplyoee\Input\in.txt"));
FileOutputFormat.setOutputPath(job, new Path("D:\datatoload\HotelCloutMap\Emplyoee\output"));
return job.waitForCompletion(true)? 0 : 1;
}
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new EmployeeDriver(), args));
}
}
//映射器
public class EmployeeMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
LongWritable l = null;
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
String[] line = value.toString().split(",");
l = new LongWritable(Long.parseLong(line[0]));
System.out.println(key + " "+ value);
context.write(l, value);
}
}
//还原剂
public class EmployeeReducer extends Reducer<LongWritable, Text, LongWritable, Text>{
public void reduce(LongWritable key, Iterable<Text> value, Context context) throws IOException, InterruptedException{
int cnt =0;
String date ="",sal = "";
for(Text val : value){
String [] str = val.toString().split(",");
sal = str[1];
date = str[2];
}
context.write(key, new Text(sal+" "+date));
}
}
//分区
public class EmployessPartitioner extends Partitioner<LongWritable, Text>{
@Override
public int getPartition(LongWritable key, Text value, int numOfReduceTasks) {
String[] line = value.toString().split(",");
int acc_no = Integer.parseInt(line[0]);
if(numOfReduceTasks == 0)
{
return 0;
}
if(acc_no == 13611)
return 0;
else if(acc_no == 13620)
return 1;
else if(acc_no == 13631)
return 2;
else
return 3;
}
}
我的程序给我的输出像这个
13611 3360 2015-09-18
13620 10000 2015-09-17
13631 5000 2015-09-16
13621 3000 2015-09-10
那么我是如何获取帐号的最新记录的。提前感谢。
您可以使用secondary sorting
来实现,您需要编写一个自定义grouping comparable
来为您排序值。这种情况发生在将数据传递给Reducer进行处理之前。二次排序,对与键对应的值进行排序。此排序列表将传递给Reducer。
public class BalanceDate implements Writable {
LongWritable balance;
Text date;
public void readFields(DataInput dataInput) throws IOException {
}
public void write(DataOutput dataOutput) throws IOException {
}
}
编写一个实现WritableComparable<BalanceDate/Text>
的自定义compare,并在作业配置中设置类。按降序对值的日期部分进行排序,然后只读取reducer中列表中的第一个值。
job.setGroupingComparatorClass(DateComparator.class);