MapReduce: Reduce函数正在写不期望的奇怪值



Java中的reduce函数正在写入不期望的输出文件值。我用断点检查我的代码,我看到,对于我所做的每个context.write调用,我写的键和值都是正确的。我哪里出错了?

我正在尝试做的是接受类型为date, customer, vendor, amount的输入行,代表交易,并生成一个数据集,行类型为date, user, balance,其中余额是用户既是客户又是供应商的所有交易的总和。

下面是我的代码:
public class Transactions {

public static class TokenizerMapper
extends Mapper<Object, Text, Text, Text>{

public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
var splittedValues = value.toString().split(",");
var date = splittedValues[0];
var customer = splittedValues[1];
var vendor = splittedValues[2];
var amount = splittedValues[3];
var reduceValue = new Text(customer + "," + vendor + "," + amount);
context.write(new Text(date), reduceValue);
}
}

public static class IntSumReducer
extends Reducer<Text,Text,Text,Text> {

public void reduce(Text key, Iterable<Text> values,
Context context
) throws IOException, InterruptedException {
Map<String, Integer> balanceByUserId = new ConcurrentHashMap<>();
values.forEach(transaction -> {
var splittedTransaction = transaction.toString().split(",");
var customer = splittedTransaction[0];
var vendor = splittedTransaction[1];
var amount = 0;
if (splittedTransaction.length > 2) {
amount = Integer.parseInt(splittedTransaction[2]);
}
if (!balanceByUserId.containsKey(customer)) {
balanceByUserId.put(customer, 0);
}
if (!balanceByUserId.containsKey(vendor)) {
balanceByUserId.put(vendor, 0);
}
balanceByUserId.put(customer, balanceByUserId.get(customer) - amount);
balanceByUserId.put(vendor, balanceByUserId.get(vendor) + amount);
});

balanceByUserId.entrySet().forEach(entry -> {
var reducerValue = new Text(entry.getKey() + "," + entry.getValue().toString());
try {
context.write(key, reducerValue);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "transactions");
job.setJarByClass(Transactions.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

其中余额是用户既是客户又是供应商的所有交易的总和

balanceByUserId只存在于每个唯一日期因为你的地图键是日期。

如果你想通过客户信息聚合(name/ID?),则customer应该是映射器输出的关键。

每个客户的所有数据都按reducer分组后,如果需要,您可以按日期排序,但按其他详细信息汇总。


同样值得指出的是,这在Hive或SparkSQL中比在Mapreduce中更容易。