我有一个Mapper<AvroKey<Email>, NullWritable, Text, Text>
,它有效地接收电子邮件并多次吐出电子邮件地址的键和在其中找到的字段的值(from, to, cc等)。
然后我有一个Reducer<Text, Text, NullWritable, Text>
,它接受电子邮件地址和字段名。它输出一个NullWritable键,并计算该地址在给定字段中出现的次数。例如…
{
"address": "joe.bloggs@gmail.com",
"toCount": 12,
"fromCount": 4
}
我使用FileUtil。copyMerge合并来自作业的输出,但(显然)来自不同reducer的结果没有合并,所以在实践中我看到:
{
"address": "joe.bloggs@gmail.com",
"toCount": 12,
"fromCount": 0
}, {
"address": "joe.bloggs@gmail.com",
"toCount": 0,
"fromCount": 4
}
有没有更明智的方法来处理这个问题,这样我就可以得到每个电子邮件地址的单个结果?(我收集一个运行预减少阶段的组合器只运行在数据的一个子集上,不保证给我想要的结果)?
编辑:Reducer代码应该是这样的:
public class EmailReducer extends Reducer<Text, Text, NullWritable, Text> {
private static final ObjectMapper mapper = new ObjectMapper();
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Map<String, Map<String, Object>> results = new HashMap<>();
for (Text value : values) {
if (!results.containsKey(value.toString())) {
Map<String, Object> result = new HashMap<>();
result.put("address", key.toString());
result.put("to", 0);
result.put("from", 0);
results.put(value.toString(), result);
}
Map<String, Object> result = results.get(value.toString());
switch (value.toString()) {
case "TO":
result.put("to", ((int) result.get("to")) + 1);
break;
case "FROM":
result.put("from", ((int) result.get("from")) + 1);
break;
}
results.values().forEach(result -> {
context.write(NullWritable.get(), new Text(mapper.writeValueAsString(result)));
});
}
}
减速器的每个输入键对应一个唯一的电子邮件地址,因此不需要results
集合。每次调用reduce
方法时,都是针对一个不同的电子邮件地址,因此我的建议是:
public class EmailReducer extends Reducer<Text, Text, NullWritable, Text> {
private static final ObjectMapper mapper = new ObjectMapper();
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Map<String, Object> result = new HashMap<>();
result.put("address", key.toString());
result.put("to", 0);
result.put("from", 0);
for (Text value : values) {
switch (value.toString()) {
case "TO":
result.put("to", ((int) result.get("to")) + 1);
break;
case "FROM":
result.put("from", ((int) result.get("from")) + 1);
break;
}
context.write(NullWritable.get(), new Text(mapper.writeValueAsString(result)));
}
}
我不确定ObjectMapper类做什么,但我认为您需要它来格式化输出。否则,我将打印输入键作为输出键(即电子邮件地址),并将每个电子邮件地址的"from"one_answers"to"字段的两个连接计数打印出来。
如果您的输入是一个数据集合(即,不是流,或类似的),那么您应该只获得每个电子邮件地址一次。如果输入以流的形式给出,并且需要增量地构建最终输出,那么一个作业的输出可以是另一个作业的输入。如果是这种情况,我建议使用MultipleInputs,其中一个Mapper是您前面描述的那个,另一个IdentityMapper将前一个作业的输出转发给Reducer。通过这种方式,同样的电子邮件地址由相同的reduce任务处理。