如何使用多核并行性在 Java 中并发运行图形算法



我想使用多核并行性在大型图上运行算法。我已经为此工作了一段时间,但一直无法提出一个好的解决方案。

这是朴素的算法:

W - a very large number
double weight = 0
while(weight < W)
- v : get_random_node_from(Graph)
- weight += calculate(v)
  • 我研究了分叉和连接,但找不到将此问题划分为更小的子问题的方法。
  • 然后我尝试使用 Java 8 流,为此我需要创建一个 lambda 表达式。当我尝试做这样的事情时:

double weight = 0 Callable<Object> task = () -> { can not update weight here, as it needs to be final }

我的问题是,是否可以在 lambda 方法中更新像weight这样的变量?或者有更好的方法来解决这个问题?

我得到的最接近的是使用ExecutorService,但遇到了同步问题。

------------编辑--------------

以下是详细的算法:

简而言之,我正在尝试做的是遍历一个巨大的图,在随机选择的节点上执行操作(只要权重<W)并更新全局结构索引。>

这需要太长时间,因为它没有利用 CPU 的全部功能。

理想情况下,多个内核上的所有线程/进程将在随机选择的节点上执行操作,并更新共享权重索引

注意:不同的线程是否选取同一节点并不重要,因为它是随机的,没有替换。

算法:

函数串行 () {

List<List<Integer>> I (shared data structure which I want to update)
double weight
//// Task which I want to parallelize
while(weight < W) {
v : get_random_node_from(Graph)
bfs(v, affected_nodes) ...// this will fill up affected_nodes by v
foreach(affected_node in affected_nodes) {
// update I related to affected_node
// and do other computation
}
weight += affected_nodes.size()
}
///////// Parallelization ends here
use_index(I) // I is passed now to some other method(not important) to get further results
}

重要的是,所有线程都更新相同的Iweight

谢谢。

好吧,你可以将weight包装成一个由单个元素组成的数组,这是这种东西的已知技巧; 甚至由Java内部完成,如下所示:

weight[0] = weight[0] + calculate(v);

但是这存在问题,因为您将并行运行它。您将不会得到所需的结果,因为weight[0]不是线程安全的。你可以使用某种同步,但Java已经有一个很好的解决方案:DoubleAdder在竞争环境(和多个CPU)中扩展得更好。

一个微不足道的小例子:

DoubleAdder weight = new DoubleAdder();
private static int calculate(int v) {
return v + 1;
}

Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
.parallel()
.forEach(x -> {
int y = calculate(x);
weight.add(y);
});
System.out.println(weight); // 54

然后是您将为此选择的随机发生器的问题:get_random_node_from(Graph).您确实需要获得一个随机Node,但同时您需要一次性获得所有这些。 但是,如果您可以将所有节点flatten到一个List中,则可能不需要它。

这里的问题是 Graph 通常以递归方式遍历,您不知道它的确切大小:

while(parent.hasChildren) {
traverse children and so on...
}

这将在流下并行处理不良,您可以自己查看Spliterators#spliteratorUnknownSize.它将从1024算术上增长;这就是为什么我建议将节点扁平化为一个已知大小的列表;这将更好地并行化。

最新更新