为什么我的MapReduce作业计数不正确?



我目前正在编写一个MapReduce任务来解析一个数据集并列出具有500+ 5星评级的电影。

为此,我已经有了一个mapreduce作业,用于从电影列表中过滤动作片,还有一个用于过滤电影的5星评论。这个新作业和每个映射器的输入是一个movieIDS列表。

第一个映射器的输入是一个适用于FOR计数的MovieIDS列表。第二个映射器的输入是一个获得了5颗星评价的movieID。

然后,我的任务是通过计算特定电影(键)获得的5星评论的数量,查看它是否获得了总共500个评论,然后查看该特定电影是否已在适用电影列表中被过滤,从而将这些内容加入到reducer中。

然而,我的问题是,我用来计算特定电影ID的5星评论数量的HashMap数据结构只初始化为1。

代码如下:

public class JoinRatings extends Configured implements Tool {
public static class TokenizerMapperA extends Mapper<Object, Text, Text, Text> {
private Text node1;
private Text node2 = new Text("1");
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
//Write movie ID and int writeable 1
node1 = new Text(value.toString());
context.write(node1, node2);
}
}
public static class TokenizerMapperB extends Mapper<Object, Text, Text, Text> {
private Text node1;
private Text node2 = new Text("2");
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
node1 = new Text(value.toString());
context.write(node1, node2);
}
}
public static class CountReducer extends Reducer<Text, Text, Text, NullWritable>{
private Text node1;
private Set<String> distinctNodes;
Map<String, Integer> map;
private final static IntWritable one = new IntWritable(1);
private final static IntWritable two = new IntWritable(2);
@Override
protected void setup(Context context) {
distinctNodes = new HashSet<String>(); 
map = new HashMap<String,Integer>();
}
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) {
//Add all action movie IDs to unique set distinctNodes
String keyString = key.toString().strip();
String valueString = "NULL";
int counter = 0
for (Text text : values) {
String value = text.toString().strip();
// try {
//  Text testText = new Text(value);
//  context.write(testText, NullWritable.get());
// } catch (IOException e) {
//  e.printStackTrace();
// } catch (InterruptedException e) {
//  e.printStackTrace();
// }
if (value.equals("1")) {
distinctNodes.add(keyString);
}
else if (value.equals("2")) {
if (map.containsKey(keyString)) {
map.put(keyString, map.get(keyString) + 1);
} else {
map.put(keyString, 1);
}
}
}


}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
String numOfDistinctNodes = Integer.toString(distinctNodes.size());
context.write(new Text(numOfDistinctNodes), NullWritable.get());
String sizeOfMap = Integer.toString(map.size());
context.write(new Text(sizeOfMap), NullWritable.get());

for(Map.Entry<String, Integer> mapElement : map.entrySet()) {
String test = Integer.toString(mapElement.getValue());
context.write(new Text(test), NullWritable.get());
}
for (String s: distinct nodes) {
if (map.containsKey(s)) {
if (map.get(s) >= 500) {
node1 = new Text(s);
context.write(node1, NullWritable.get());
}
}
}

//Order movieID by ascending
//Write all movie IDs with 500+ 5-star reviews (to context)
}
}

正如您所看到的,我正在使用映射器中的Text值来区分减速器应该添加到哪个数据结构。这似乎对Text & 1"工作得很好,它将适用的电影的5星评级可以计算到一个HashSet,但我的HashMap不计算特定键/MovieID的5星评论的数量,只将值初始化为1。

我的倾向是我不正确地使用MapReduce,并且减速器只获得{MovieID, 2}的单一输入,而不是{MovieID, 2,2,2}。

reducer是分布式的,因此不能用于在每个实例之间存储状态,比如Map。更具体地说,注意key参数是一个实例——每个唯一键得到一个Reducer类实例;并不是所有的键都通过一个实例函数传递。

您可以使用HadoopDistributedCacheCounter来维护跨任务的一些状态,但是您应该已经在单个Iterable<Text>对象中拥有来自相同ID的所有值。例如,所有评论(带有星星计数)按id划分。因此,按value.getStars() == 5过滤。(考虑定义一个可写的Movie类,而不是使用Text),然后定义一个简单的计数器,例如

int fiveStarCount = 0;
for (Text t : values) {
// todo: get stars
fiveStarCount += parseStars(t) == 5 ? 1 : 0;
}
if (fiveStarCount >= 500) {
context.write(movieId, NullWritable.get());  // assuming you only care about the movie id
}

那么,这里就没有什么要排序的了…您需要第二个MapReduce作业来读取输出,然后写入(null,id)元组,然后reducer可以通过将所有数据转储到TreeSet对象中并写回上下文来对值进行排序。


如果你真的需要过滤、连接和排序,我推荐Hive或Spark而不是纯Mapreduce

相关内容

  • 没有找到相关文章

最新更新