Hadoop:Mapreduce-数据总和(Java)



使用我的mapreduce作业后,这是输出:

User16565   Logins: 1   Orders:1
User16566   Logins: 2   Orders:2
User16567   Logins: 1   Orders:1

一切看起来都很好,但当日志文件有数千个条目时,就没有太大帮助了。有没有办法更改我的代码来总结"登录"one_answers"订单",这样我就可以计算差额了?

编辑:新问题

日志示例:

2013-01-01T08:48:09.009+0100,feature:login,-,User73511,-,-,-,-
2013-01-01T03:58:05.005+0100,feature:order-created,-,User73511,-,-,-,-
2013-01-01T01:26:30.030+0100,feature:login,-,User14253,-,-,-,-
2013-01-01T19:45:01.001+0100,feature:order-created,-,User73511,-,-,-,-

我在代码中发现一个错误。我意识到Logins&订单数不正确。起初,输出似乎是正确的,但当我检查登录&手动订购我意识到有一个错误。输出:

User73511   Logins: 3   Orders:2
User14253   Logins: 1   Orders:1

应为:

User73511   Logins: 1   Orders:2
User14253   Logins: 1   Orders:0

这是整个代码:

public class UserOrderCount {
    public static class SingleUserMapper extends
            Mapper<LongWritable, Text, Text, CountInformationTuple> {
        private Text outUserId = new Text();
        private CountInformationTuple outCountOrder = new CountInformationTuple();
        @Override
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String tempString = value.toString();
            String[] singleUserData = tempString.split(",");
            String userId = singleUserData[3];
            String featureId = singleUserData[1];
        if (featureId.contains("feature:order-created")) {
                outCountOrder.setCountOrder(1);
        }
        if (featureId.contains("feature:login")) {
                outCountOrder.setCountLogin(1);
        }

            outUserId.set(userId);
            context.write(outUserId, outCountOrder);
        }
    }
    public static class SingleUserReducer extends
            Reducer<Text, CountInformationTuple, Text, CountInformationTuple> {
        private CountInformationTuple result = new CountInformationTuple();
        public void reduce(Text key, Iterable<CountInformationTuple> values,
                Context context) throws IOException, InterruptedException {
            int login = 0;
            int order = 0;
            for (CountInformationTuple val : values) {
                login += val.getCountLogin();
                order += val.getCountOrder();
            }
            result.setCountLogin(login);
            result.setCountOrder(order);
            context.write(key, result);
        }
    }
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args)
                .getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: UserOrderCount <in> <out>");
            System.exit(2);
        }
        Job job = new Job(conf);
        job.setJobName("UserOrderCount");
        job.setJarByClass(UserOrderCount.class);
        job.setMapperClass(SingleUserMapper.class);
        job.setCombinerClass(SingleUserReducer.class);
        job.setReducerClass(SingleUserReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(CountInformationTuple.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
    public static class CountInformationTuple implements Writable {
        private int countOrder = 0;
        private int countLogin = 0;
        public int getCountOrder() {
            return countOrder;
        }
        public void setCountOrder(int order) {
            this.countOrder = order;
        }
        public int getCountLogin() {
            return countLogin;
        }
        public void setCountLogin(int login) {
            this.countLogin = login;
        }
        @Override
        public void readFields(DataInput in) throws IOException {
            countOrder = in.readInt();
            countLogin = in.readInt();
        }
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeInt(countLogin);
            out.writeInt(countOrder);
        }
        @Override
        public String toString() {
            return "Logins: "+ countLogin + "t" + "Orders:" + countOrder;
        }
    }
}

对于插入的一个:解决了我的"错误输出"-错误。

public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String tempString = value.toString();
        String[] stringData = tempString.split(",");
        String userID = stringData[3];
        String featureID = stringData[1];
        int login = 0;
        int order = 0;
        if (featureID.matches("feature:login")) {
            login++;
        } else if (featureID.matches("feature:order-created")) {
            order++;
        }
        outUserID.set(userID);
        outUserCount.set(login, order);
        context.write(outUserID, outUserCount);
    }
public static class UserCountTuple implements Writable {
        private IntWritable countLogin;
        private IntWritable countOrder;
        public UserCountTuple() {
            set(new IntWritable(0), new IntWritable(0));
        }
        public void set(int countLogin, int countOrder) {
            this.countLogin.set(countLogin);
            this.countOrder.set(countOrder);
        }
        public void set(IntWritable countLogin, IntWritable countOrder) {
            this.countLogin = countLogin;
            this.countOrder = countOrder;
        }
        @Override
        public void readFields(DataInput in) throws IOException {
            countLogin.readFields(in);
            countOrder.readFields(in);
        }
        @Override
        public void write(DataOutput out) throws IOException {
            countLogin.write(out);
            countOrder.write(out);
        }
        public IntWritable getLogin() {
            return countLogin;
        }
        public IntWritable getOrder() {
            return countOrder;
        }
        @Override
        public String toString() {
            return "Logins: " + countLogin + "t" + "Orders:" + countOrder;
        }
    }

如果您想要一个文件,您可以使用jobConf.setNumReduceTasks(1)将MapReduce作业配置为仅使用一个reduce任务,请参阅JobConf JavaDoc了解更多信息。

现在,您的唯一reduce任务将获得每个用户的所有loginorder计数。您只需对reduce任务中已处理记录的所有loginorder值求和,然后在cleanup()方法中输出求和值,该方法在处理完单个reduce的所有输入记录后只调用一次。示例代码:

public static class SingleUserReducer extends
        Reducer<Text, CountInformationTuple, Text, CountInformationTuple> {
    private CountInformationTuple result = new CountInformationTuple();
    private int login = 0;
    private int order = 0;
    public void reduce(Text key, Iterable<CountInformationTuple> values,
            Context context) throws IOException, InterruptedException {
        for (CountInformationTuple val : values) {
            login += val.getCountLogin();
            order += val.getCountOrder();
        }
    }
    public void cleanup(Context context) throws IOException, InterruptedException {
        result.setCountLogin(login);
        result.setCountOrder(order);
        context.write(new Text("total"), result);
    }
}

您将得到一条记录作为输出,其总和为loginorder。如果需要,可以修改cleanup()方法来计算差异和其他度量。

最新更新