你知道我如何使用MapReduce范例来实现这个算法吗?
def getFriends(self, degree):
friendList = []
self._getFriends(degree, friendList)
return friendList
def _getFriends(self, degree, friendList):
friendList.append(self)
if degree:
for friend in self.friends:
friend._getFriends(degree-1, friendList)
假设我们有以下双向友谊:
(1,2),(1,3),(1,4),(4,5),(4,6),(5,7),(5,8)
例如,如何获得用户1的1级、2级和3级连接?答案必须是1->2、3、4、5、7、8
感谢
也许您可以使用支持类似sql查询的hive!
据我所知,你想收集社交图中某个人第n个圈子里的所有朋友。大多数图算法都是递归的,递归不太适合MapReduce解决任务的方法。
我可以建议你使用ApacheGiraph来解决这个问题(实际上它在后台使用了MapReduce)。它大多是异步的,您可以编写描述单个节点行为的作业,如:
1. Send a message from root node to all friends to get their friendlist.
2.1. Each friend sends a message with friendlist to root node.
2.2. Each friend sends a message to all it's sub-friends to get their friendlist.
3.1. Each sub-friend sends a message with friendlist to root node.
3.2. Each sub-friend sends a message to all it's sub-sub-friends to get their friendlist.
...
N. Root node collects all these messages and merges them in a single list.
你也可以使用一连串的地图减少作业来收集圆圈,但这不是解决任务的有效方法:
- 将根用户好友导出到文件
circle-001
- 使用
circle-001
作为将每个用户朋友从circle-001
导出到circle-002
的作业的输入 - 执行相同操作,但使用
circle-002
作为输入 - 重复N次
如果你有很多用户来计算他们的圆,第一种方法更合适。第二种方法启动多个MR作业会带来巨大的开销,但它要简单得多,而且适用于小输入用户集。
我是这个领域的新手,但这是我的想法。
您可以按照下面的伪代码使用传统的BFS算法。
在每次迭代中,您都会启动一个Hadoop作业,该作业会发现当前工作集中尚未访问的所有子节点。
BFS (list curNodes, list visited, int depth){
if (depth <= 0){
return visited;
}
//run Hadoop job on the current working set curNodes restricted by visited
//the job will populate some result list with the list of child nodes of the current working set
//then,
visited.addAll(result);
curNodes.empty();
curNodes.addAll(result);
BFS(curNodes, visited, depth-1);
}
此作业的映射器和还原器如下所示。
在这个例子中,我只是使用静态成员来保存工作集、访问集和结果集。
它应该使用临时文件来实现。可能有一些方法可以优化从一次迭代到下一次迭代累积的临时数据的持久性。
我用于该作业的输入文件包含倾倒列表,每行倾倒一次,例如。1,22,35,4。。。…
public static class VertexMapper extends
Mapper<Object, Text, IntWritable, IntWritable> {
private static Set<IntWritable> curVertex = null;
private static IntWritable curLevel = null;
private static Set<IntWritable> visited = null;
private IntWritable key = new IntWritable();
private IntWritable value = new IntWritable();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString(), ",");
if (itr.countTokens() == 2) {
String keyStr = itr.nextToken();
String valueStr = itr.nextToken();
try {
this.key.set(Integer.parseInt(keyStr));
this.value.set(Integer.parseInt(valueStr));
if (VertexMapper.curVertex.contains(this.key)
&& !VertexMapper.visited.contains(this.value)
&& !key.equals(value)) {
context.write(VertexMapper.curLevel, this.value);
}
} catch (NumberFormatException e) {
System.err.println("Found key,value <" + keyStr + "," + valueStr
+ "> which cannot be parsed as int");
}
} else {
System.err.println("Found malformed line: " + value.toString());
}
}
}
public static class UniqueReducer extends
Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
private static Set<IntWritable> result = new HashSet<IntWritable>();
public void reduce(IntWritable key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
for (IntWritable val : values) {
UniqueReducer.result.add(new IntWritable(val.get()));
}
// context.write(key, key);
}
}
运行作业将类似于
UniqueReducer.result.clear();
VertexMapper.curLevel = new IntWritable(1);
VertexMapper.curVertex = new HashSet<IntWritable>(1);
VertexMapper.curVertex.add(new IntWritable(1));
VertexMapper.visited = new HashSet<IntWritable>(1);
VertexMapper.visited.add(new IntWritable(1));
Configuration conf = getConf();
Job job = new Job(conf, "BFS");
job.setJarByClass(BFSExample.class);
job.setMapperClass(VertexMapper.class);
job.setCombinerClass(UniqueReducer.class);
job.setReducerClass(UniqueReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
job.setOutputFormatClass(NullOutputFormat.class);
boolean result = job.waitForCompletion(true);
BFSExample bfs = new BFSExample();
ToolRunner.run(new Configuration(), bfs, args);