如何使用Map Reduce按最新日期进行记录



我最近开始学习map reduce编程。因此,出于这些目的,我从一个场景开始。在那里我有样本数据,如账号,余额&交易日期。所以我想要最近的交易发生的帐号。

这是我的输入:

+-------+-------+------------+
| accno | bal   | date       |
+-------+-------+------------+
| 13611 |  3360 | 2015-09-18 |
| 13611 |  1500 | 2015-09-19 |
| 13620 | 10000 | 2015-09-17 |
| 13620 |  6000 | 2015-09-18 |
| 13620 |  3000 | 2015-09-19 |
| 13631 |  5000 | 2015-09-16 |
| 13631 |  3500 | 2015-09-18 |
| 13621 |  3000 | 2015-09-10 |
| 13621 |  1800 | 2015-09-15 |
+-------+-------+------------+

预期输出-->

    +-------+-------+------------+
    | accno | bal   | Date       |
    +-------+-------+------------+
    | 13611 |  1500 | 2015-09-19 |
    | 13620 |  3000 | 2015-09-19 |
    | 13631 |  3500 | 2015-09-18 |
    | 13621 |  1800 | 2015-09-15 |
    +-------+-------+------------+

我正在尝试开发代码,但我被如何获取特定密钥的最新日期所困扰。作为过程

1( 我读取输入并发出acc_no作为键&作为值的文本行。

2( 然后我在密钥(即acc_no(上划分数据

3( 在reduce阶段,实现逻辑以在最新日期之前获取记录。

//驱动程序代码

public class EmployeeDriver extends Configured implements Tool{
    @Override
    public int run(String[] arg0) throws Exception {
        Configuration conf = getConf();
        Job job = Job.getInstance(conf);
        job.setJarByClass(getClass());
//      job.setInputFormatClass(KeyValueTextInputFormat.class);
        job.setMapperClass(EmployeeMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setPartitionerClass(EmployessPartitioner.class);
        job.setReducerClass(EmployeeReducer.class);
        job.setNumReduceTasks(4);
        FileInputFormat.addInputPath(job, new Path("D:\datatoload\HotelCloutMap\Emplyoee\Input\in.txt"));
        FileOutputFormat.setOutputPath(job, new Path("D:\datatoload\HotelCloutMap\Emplyoee\output"));
        return job.waitForCompletion(true)? 0 : 1;
    }
    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run(new EmployeeDriver(), args));
    }
}

//映射器

public class EmployeeMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
    LongWritable l = null;
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
        String[] line = value.toString().split(",");
        l = new LongWritable(Long.parseLong(line[0]));
        System.out.println(key + " "+ value);
        context.write(l, value);
    }
}

//还原剂

public class EmployeeReducer extends Reducer<LongWritable, Text, LongWritable, Text>{
    public void reduce(LongWritable key, Iterable<Text> value, Context context) throws IOException, InterruptedException{
        int cnt =0;
        String date ="",sal = "";
        for(Text val : value){
             String [] str = val.toString().split(",");
             sal = str[1];
             date = str[2];
        }
        context.write(key, new Text(sal+" "+date));
    }
}

//分区

public class EmployessPartitioner extends Partitioner<LongWritable, Text>{
    @Override
    public int getPartition(LongWritable key, Text value, int numOfReduceTasks) {
        String[] line = value.toString().split(",");
        int acc_no = Integer.parseInt(line[0]);
         if(numOfReduceTasks == 0)
         {
            return 0;
         }
        if(acc_no == 13611)
            return 0;
        else if(acc_no == 13620)
            return 1;
        else if(acc_no == 13631)
            return 2;
        else
            return 3;
    }
}

我的程序给我的输出像这个

13611   3360 2015-09-18
13620   10000 2015-09-17
13631   5000 2015-09-16
13621   3000 2015-09-10

那么我是如何获取帐号的最新记录的。提前感谢。

您可以使用secondary sorting来实现,您需要编写一个自定义grouping comparable来为您排序值。这种情况发生在将数据传递给Reducer进行处理之前。二次排序,对与键对应的值进行排序。此排序列表将传递给Reducer。

public class BalanceDate implements Writable {
   LongWritable balance;
   Text date;
   public void readFields(DataInput dataInput) throws IOException {
   }
   public void write(DataOutput dataOutput) throws IOException {
   }
}

编写一个实现WritableComparable<BalanceDate/Text>的自定义compare,并在作业配置中设置类。按降序对值的日期部分进行排序,然后只读取reducer中列表中的第一个值。

job.setGroupingComparatorClass(DateComparator.class);

相关内容

  • 没有找到相关文章

最新更新