使用我的mapreduce作业后,这是输出:
User16565 Logins: 1 Orders:1
User16566 Logins: 2 Orders:2
User16567 Logins: 1 Orders:1
一切看起来都很好,但当日志文件有数千个条目时,就没有太大帮助了。有没有办法更改我的代码来总结"登录"one_answers"订单",这样我就可以计算差额了?
编辑:新问题
日志示例:
2013-01-01T08:48:09.009+0100,feature:login,-,User73511,-,-,-,-
2013-01-01T03:58:05.005+0100,feature:order-created,-,User73511,-,-,-,-
2013-01-01T01:26:30.030+0100,feature:login,-,User14253,-,-,-,-
2013-01-01T19:45:01.001+0100,feature:order-created,-,User73511,-,-,-,-
我在代码中发现一个错误。我意识到Logins&订单数不正确。起初,输出似乎是正确的,但当我检查登录&手动订购我意识到有一个错误。输出:
User73511 Logins: 3 Orders:2
User14253 Logins: 1 Orders:1
应为:
User73511 Logins: 1 Orders:2
User14253 Logins: 1 Orders:0
这是整个代码:
public class UserOrderCount {
public static class SingleUserMapper extends
Mapper<LongWritable, Text, Text, CountInformationTuple> {
private Text outUserId = new Text();
private CountInformationTuple outCountOrder = new CountInformationTuple();
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String tempString = value.toString();
String[] singleUserData = tempString.split(",");
String userId = singleUserData[3];
String featureId = singleUserData[1];
if (featureId.contains("feature:order-created")) {
outCountOrder.setCountOrder(1);
}
if (featureId.contains("feature:login")) {
outCountOrder.setCountLogin(1);
}
outUserId.set(userId);
context.write(outUserId, outCountOrder);
}
}
public static class SingleUserReducer extends
Reducer<Text, CountInformationTuple, Text, CountInformationTuple> {
private CountInformationTuple result = new CountInformationTuple();
public void reduce(Text key, Iterable<CountInformationTuple> values,
Context context) throws IOException, InterruptedException {
int login = 0;
int order = 0;
for (CountInformationTuple val : values) {
login += val.getCountLogin();
order += val.getCountOrder();
}
result.setCountLogin(login);
result.setCountOrder(order);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: UserOrderCount <in> <out>");
System.exit(2);
}
Job job = new Job(conf);
job.setJobName("UserOrderCount");
job.setJarByClass(UserOrderCount.class);
job.setMapperClass(SingleUserMapper.class);
job.setCombinerClass(SingleUserReducer.class);
job.setReducerClass(SingleUserReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CountInformationTuple.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class CountInformationTuple implements Writable {
private int countOrder = 0;
private int countLogin = 0;
public int getCountOrder() {
return countOrder;
}
public void setCountOrder(int order) {
this.countOrder = order;
}
public int getCountLogin() {
return countLogin;
}
public void setCountLogin(int login) {
this.countLogin = login;
}
@Override
public void readFields(DataInput in) throws IOException {
countOrder = in.readInt();
countLogin = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(countLogin);
out.writeInt(countOrder);
}
@Override
public String toString() {
return "Logins: "+ countLogin + "t" + "Orders:" + countOrder;
}
}
}
对于插入的一个:解决了我的"错误输出"-错误。
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String tempString = value.toString();
String[] stringData = tempString.split(",");
String userID = stringData[3];
String featureID = stringData[1];
int login = 0;
int order = 0;
if (featureID.matches("feature:login")) {
login++;
} else if (featureID.matches("feature:order-created")) {
order++;
}
outUserID.set(userID);
outUserCount.set(login, order);
context.write(outUserID, outUserCount);
}
public static class UserCountTuple implements Writable {
private IntWritable countLogin;
private IntWritable countOrder;
public UserCountTuple() {
set(new IntWritable(0), new IntWritable(0));
}
public void set(int countLogin, int countOrder) {
this.countLogin.set(countLogin);
this.countOrder.set(countOrder);
}
public void set(IntWritable countLogin, IntWritable countOrder) {
this.countLogin = countLogin;
this.countOrder = countOrder;
}
@Override
public void readFields(DataInput in) throws IOException {
countLogin.readFields(in);
countOrder.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
countLogin.write(out);
countOrder.write(out);
}
public IntWritable getLogin() {
return countLogin;
}
public IntWritable getOrder() {
return countOrder;
}
@Override
public String toString() {
return "Logins: " + countLogin + "t" + "Orders:" + countOrder;
}
}
如果您想要一个文件,您可以使用jobConf.setNumReduceTasks(1)
将MapReduce作业配置为仅使用一个reduce任务,请参阅JobConf JavaDoc了解更多信息。
现在,您的唯一reduce任务将获得每个用户的所有login
和order
计数。您只需对reduce任务中已处理记录的所有login
和order
值求和,然后在cleanup()方法中输出求和值,该方法在处理完单个reduce的所有输入记录后只调用一次。示例代码:
public static class SingleUserReducer extends
Reducer<Text, CountInformationTuple, Text, CountInformationTuple> {
private CountInformationTuple result = new CountInformationTuple();
private int login = 0;
private int order = 0;
public void reduce(Text key, Iterable<CountInformationTuple> values,
Context context) throws IOException, InterruptedException {
for (CountInformationTuple val : values) {
login += val.getCountLogin();
order += val.getCountOrder();
}
}
public void cleanup(Context context) throws IOException, InterruptedException {
result.setCountLogin(login);
result.setCountOrder(order);
context.write(new Text("total"), result);
}
}
您将得到一条记录作为输出,其总和为login
和order
。如果需要,可以修改cleanup()
方法来计算差异和其他度量。