'
我正在浏览Agraph的行动和这本书(在这里的源代码:https://github.com/insidedctm/spark-graphx-in-action(讨论两种计算距离的方法(边缘啤酒花的数量(在树的根和所有节点之间下来的叶子。我了解使用的代码示例聚合。特别是,停止条件是有道理的(我有通过包括下面的文本"停止条件"。(一旦在顶点上的属性图形停止更改,继续运行算法不再有意义。
当我看着预算计算的pregel方式时,我有些困惑相同的结果(如下所示(
特别是当调用pregel的应用方法时,最大是默认值。因此,似乎是:
的" sendmsg"函数 (et:EdgeTriplet[Int,String]) =>
Iterator((et.dstId, et.srcAttr+1)),
即使在顶点上的值收敛之后,也将无限地称为。
。我忽略了一些机制使该程序在收敛后停止?
// aggregateMessages approach
// from: https://github.com/insidedctm/spark-graphx-in-action/blob/51e4c667b927466bd02a0a027ca36625b010e0d6/Chapter04/Listing4_10IteratedFurthestVertex.scala
def sendMsg(ec: EdgeContext[Int,String,Int]): Unit = {
ec.sendToDst(ec.srcAttr+1)
}
def mergeMsg(a: Int, b: Int): Int = {
math.max(a,b)
}
def propagateEdgeCount(g:Graph[Int,String])
:Graph[Int,String] = {
val verts =
g.aggregateMessages[Int](sendMsg, mergeMsg)
val g2 =
Graph(verts, g.edges)
val check =
g2.vertices.join(g.vertices).
map(x => x._2._1 – x._2._2).
reduce(_ + _)
// STOP CONDITION
// check here ensures stop if nothing changed (******)
if (check > 0)
propagateEdgeCount(g2)
else
g
}
// Pregel approach
val g = Pregel(myGraph.mapVertices((vid,vd) => 0), 0,
activeDirection = EdgeDirection.Out)(
(id:VertexId,vd:Int,a:Int) => math.max(vd,a),
(et:EdgeTriplet[Int,String]) =>
Iterator((et.dstId, et.srcAttr+1)),
(a:Int,b:Int) => math.max(a,b))
g.vertices.collect
  据我所知,如果所有节点都停止,那么pregel将停止工作。
&essp; essp;有两种方法可以停止所有节点,这些节点可以通过所有节点的属性不变来实现:
-
1.换句话说,换句话说,如果给定的条件是错误的,节点将停止发送消息。
-
22 就是说,尽管发送消息条件仍然是正确的,但是所有节点的属性都没有变化。
val bfs2 = initialGraph2.pregel(Double.PositiveInfinity)( (id, attr, msg) => math.min(attr, msg), triplet => { if (triplet.srcAttr != Double.PositiveInfinity && triplet.dstAttr == Double.PositiveInfinity) {Iterator((triplet.dstId, triplet.srcAttr+1))} else {Iterator.empty}}, (a,b) => math.min(a,b) ).cache()
"triplet.dstAttr == Double.PositiveInfinity"
是继续条件。
 如果所有节点的少于双重。