Hadoop二级排序——使用还是不使用



我有交通数据分析的事故输入数据。其中一些列是:

事故编号,事故日期,星期几

1,1/1/1979,5(星期四)

2,1/2/1979,6(星期五)

.......

3,1/1/1980, 0(星期日)

我正在尝试解决以下问题:

找出每年每天的事故数量

所以输出应该看起来像:

where键是(Year, Day of week)

和Value=当天事故数这里第1行表示年份=1979日期=星期日,事故数量=500,以此类推。

1979,1     500
1979,2    1500
1979,3    2500
1979,4    3500
1979,5    4500
1979,6    5500
1979,7    6500
1980,1     500
1980,2    1500
1980,3    2500
1980,4    3500
1980,5    4500

在这种情况下,我试图使用二次排序方法来解决它。这是解决问题的正确方法吗?

如果二级排序是正确的方式,它不适合我。这里是关键类,mapper和reducer。但是我的输出没有达到预期。请帮忙……

public class DOW implements WritableComparable<DOW> {
    private Text year;
    private Text day;
    // private final Text count;
    // private int count;
    public DOW() {
        this.year = new Text();
        this.day = new Text();
        // this.count = count;
    }
    public DOW(Text year, Text day) {
        this.year = year;
        this.day = day;
        // this.count = count;
    }
    public Text getYear() {
        return this.year;
    }
    public void setYear(Text year) {
        this.year = year;
    }
    public Text getDay() {
        return this.day;
    }
    public void setDay(Text day) {
        this.day = day;
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        // TODO Auto-generated method stub
        year.readFields(in);
        day.readFields(in);
    }
    @Override
    public void write(DataOutput out) throws IOException {
        // TODO Auto-generated method stub
        year.write(out);
        day.write(out);
    }
    @Override
    public int compareTo(DOW o) {
        // TODO Auto-generated method stub
        int cmp = year.compareTo(o.year);
        if (cmp != 0) {
            return cmp;
        }
        return o.day.compareTo(this.day);
    }
    @Override
    public String toString() {
        // TODO Auto-generated method stub
        return year + "," + day;
    }
    @Override
    public boolean equals(Object o) {
        // TODO Auto-generated method stub
        if (o instanceof DOW) {
            DOW tp = (DOW) o;
            return year.equals(tp.year) && day.equals(tp.day);
        }
        return false;
    }
    @Override
    public int hashCode() {
        // TODO Auto-generated method stub
        return year.hashCode() * 163 + day.hashCode();
    }
}
public class AccidentDowDemo extends Configured implements Tool {
    public static class DOWMapper extends Mapper<LongWritable, Text, DOW, IntWritable> {
        private static final Logger sLogger = Logger.getLogger(DOWMapper.class);
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws java.io.IOException, InterruptedException {
            if (value.toString().contains(",")) {
                String[] array = value.toString().split(",");
                if (!array[9].equals("Date")) {
                    Date dt = null;
                    try {
                        dt = new SimpleDateFormat("dd/mm/yyyy").parse(array[9]);
                    } catch (ParseException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    int year = dt.getYear();
                    int day = Integer.parseInt(array[10].toString());
                                        context.write(new DOW(new Text(Integer.toString(year)),
                            new Text(Integer.toString(day))),
                            new IntWritable(1));
                }
            }
        };
    }
    public static class DOWReducer extends Reducer<DOW, IntWritable, DOW, IntWritable> {
        private static final Logger sLogger = Logger
                .getLogger(DOWReducer.class);
        @Override
        protected void reduce(DOW key, Iterable<IntWritable> values,
                Context context) throws java.io.IOException,
                InterruptedException {
            int count = 0;
            sLogger.info("key =" + key);
            for (IntWritable x : values) {
                int val = Integer.parseInt(x.toString());
                count = count + val;
            }
            context.write(key, new IntWritable(count));
        };
    }
    public static class FirstPartitioner extends Partitioner<DOW, IntWritable> {
        @Override
        public int getPartition(DOW key, IntWritable value, int numPartitions) {
            // TODO Auto-generated method stub
            return Math.abs(Integer.parseInt(key.getYear().toString()) * 127)
                    % numPartitions;
        }
    }
    public static class KeyComparator extends WritableComparator {
        protected KeyComparator() {
            super(DOW.class, true);
        }
        @Override
        public int compare(WritableComparable w1, WritableComparable w2) {
            // TODO Auto-generated method stub
            DOW ip1 = (DOW) w1;
            DOW ip2 = (DOW) w2;
            int cmp = ip1.getYear().compareTo(ip2.getYear());
            if (cmp == 0) {
                cmp = -1 * ip1.getDay().compareTo(ip2.getDay());
            }
            return cmp;
        }
    }
    public static class GroupComparator extends WritableComparator {
        protected GroupComparator() {
            super(DOW.class, true);
        }
        @Override
        public int compare(WritableComparable w1, WritableComparable w2) {
            // TODO Auto-generated method stub
            DOW ip1 = (DOW) w1;
            DOW ip2 = (DOW) w2;
            return ip1.getYear().compareTo(ip2.getYear());
        }
    }
}

如果你需要基本模拟

select year, day, count(*) as totalPerDay from DATA group by year, day

不需要二次排序。

但是如果您需要生成像CUBE这样的东西,需要在一个MR作业中计算每年和每周的总数,那么二级排序是可行的。

它或多或少是一种二级排序,但不是。问题是使用GroupComparator时,必须按年和按天进行比较。groupcomparator的思想是确保同一年进入相同的减速机但这里我们不需要这样,相反,如果年份和日期相同(1979和星期日)数据必须进入相同的减速机。它看起来应该是这样的。

package accidentexercise;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class ClassGroupComparator extends WritableComparator
{
    protected ClassGroupComparator()
    {
        super(TextpairWritable.class,true);
    }
    @SuppressWarnings("rawtypes")
    public int compare(WritableComparable w,WritableComparable w1)
    {
        TextpairWritable s=(TextpairWritable)w;
        TextpairWritable s1=(TextpairWritable)w1;
        int cmp= s.year.compareTo(s1.year);
            if(cmp==0)
            {
                cmp= -1*s.day.compareTo(s1.day);
            }
            return cmp;
    }
}

我也在粘贴我的整个代码。

TextpairWritable:
package accidentexercise;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
public class TextpairWritable implements WritableComparable<TextpairWritable>
{
    Text year=new Text();
    Text day=new Text();
    public TextpairWritable()
    {
        this.year=new Text();
        this.day=new Text();
    }
    public TextpairWritable(Text year,Text day)
    {
        this.year=year;
        this.day=day;
    }
    public TextpairWritable(String year,String day)
    {
        this.year=new Text(year);
        this.day=new Text(day);
    }
    public TextpairWritable(TextpairWritable o)
    {
        this.year=o.year;
        this.day=o.day;
    }
    public void set(Text year,Text day)
    {
        this.year=year;
        this.day=day;
    }
    public Text getyear()
    {
        return this.year;
    }
    public Text getday()
    {
        return this.day;
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        year.readFields(in);
        day.readFields(in);
    }
    @Override
    public void write(DataOutput out) throws IOException {
        year.write(out);
        day.write(out);
    }
    public String toString()
    {
        return year+" "+day;
    }
    public int compareTo(TextpairWritable o)
    {
        int cmp=year.compareTo(day);
        if(cmp==0)
        {
            cmp=day.compareTo(day);
        }
        return cmp;
    }
}
GroupComparator:

package accidentexercise;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class ClassGroupComparator extends WritableComparator
{
    protected ClassGroupComparator()
    {
        super(TextpairWritable.class,true);
    }
    @SuppressWarnings("rawtypes")
    public int compare(WritableComparable w,WritableComparable w1)
    {
        TextpairWritable s=(TextpairWritable)w;
        TextpairWritable s1=(TextpairWritable)w1;
        int cmp= s.year.compareTo(s1.year);
            if(cmp==0)
            {
                cmp= -1*s.day.compareTo(s1.day);
            }
            return cmp;
    }
}
SortComparator:
package accidentexercise;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class ClassSortComparator extends WritableComparator
{
    protected ClassSortComparator()
    {
        super(TextpairWritable.class,true);
    }
    @SuppressWarnings("rawtypes")
    public int compare(WritableComparable w,WritableComparable w1)
    {
        TextpairWritable s=(TextpairWritable)w;
        TextpairWritable s1=(TextpairWritable)w1;
        int cmp=s.year.compareTo(s1.year);
        if(cmp==0)
        {
            cmp= -1*s.day.compareTo(s1.day);
        }
        return cmp;
    }
}
Mapper:
package accidentexercise;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ClassMapper extends Mapper<LongWritable,Text,TextpairWritable,IntWritable>
{
    public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
    {
        Logger log=LoggerFactory.getLogger(ClassMapper.class) ;
        String s=value.toString();
        String[] orig_data=s.split(",");
        SimpleDateFormat df=new SimpleDateFormat("dd/MM/yyyy");
        df.setLenient(false);
        try
        {
            @SuppressWarnings("unused")
            Date date=df.parse(orig_data[0]);
            String myyear=orig_data[0].substring(6, 10);
            context.write(new TextpairWritable(new Text(myyear),new Text(orig_data[2])),new IntWritable(Integer.parseInt(orig_data[1])));
        }
        catch(ParseException e)
        {
            log.info("Date is not correct"+e);
        }
    }
}
Reducer:
package accidentexercise;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class ClassReducer extends Reducer<TextpairWritable,IntWritable,TextpairWritable,IntWritable>
{
    public void reduce(TextpairWritable key,Iterable<IntWritable> value,Context context) throws IOException,InterruptedException
    {
        int count=0;
        for(IntWritable it:value)
        {
            count+=it.get();
        }
        context.write(key,new IntWritable(count));
    }
}
Driver:
package accidentexercise;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class ClassDriver {
    public static void main(String args[]) throws Exception
    {
        if(args.length!=2)
            {
            System.err.println("Usage: Worddrivernewapi <input path> <output path>");
            System.exit(-1);
            }
        Job job=new Job();
        job.setJarByClass(ClassDriver.class);
        job.setJobName("MyDriver");
        FileInputFormat.addInputPath(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        job.setMapperClass(ClassMapper.class);
        job.setPartitionerClass(ClassPartitioner.class);
        job.setSortComparatorClass(ClassSortComparator.class);
        job.setGroupingComparatorClass(ClassGroupComparator.class);
        job.setReducerClass(ClassReducer.class);
        //job.setNumReduceTasks(0);
        job.setOutputKeyClass(TextpairWritable.class);
        job.setOutputValueClass(IntWritable.class);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
Partitioner:
package accidentexercise;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class ClassPartitioner extends Partitioner<TextpairWritable,IntWritable>
{
    @Override
    public int getPartition(TextpairWritable tp, IntWritable value, int numPartitions) {
        return Math.abs(Integer.parseInt(tp.getyear().toString()) * 127) % numPartitions;   
    }

}
样本输入:

日期、Number_of_accidents天

01/03/2014 18 2

02/03/2014 19 3

03/03/2014 20 4

01/03/2014 1 2

02/03/2014 2 3

03/03/2014 4 4

01/03/2014 8 2

02/03/2014 9 3

03/03/2014, 2、4

输出:

01/03/2014 2 27

02/03/2014 3 30

03/03/2014 4 26

相关内容

  • 没有找到相关文章

最新更新