GraphFrames中的聚合信息没有输出



我只是从GraphFrames开始,尽管我关注文档,但我无法从gentRegateMessages函数中获得任何结果(它返回一个空数据库)。这是我问题的简化示例:i GraphFrames对象称为 testGraph,使我的vertexrdd仅由一个没有顶点属性的单个顶点Y组成,而我的edgerdd由两个类似的记录组成:

| src | dst | min_ts1 | min_ts2 |
|  X  |  Y  |    20   |   null  |
|  Y  |  X  |   null  |   -10   |

现在,我想实现一种简单的算法,该算法将min_ts1的值发送到dst,然后将min_ts2发送到src。我用来实现此算法的代码是:

import org.graphframes.lib.AggregateMessages
import org.apache.spark.sql.functions._
val AM = AggregateMessages
val msgToSrc = AM.edge("min_ts2)
val msgToDst = AM.edge("min_ts1")
val delay = testGraph
.aggregateMessages
  .sendToSrc(msgToSrc)
  .sendToDst(msgToDst)  
  .agg(sum(AM.msg).as("avg_time_delay")) 

我意识到这里有一些无效的值,但是无论我希望传递算法的消息会执行以下操作:查看第一个记录,并将20的消息发送到Y,以及null的消息到X。然后查看第二个记录,然后将null的消息发送到X,以及-10的消息到Y。最后,我希望结果表明,Y的消息总和是10,并且结果中没有X的记录,因为它未包含在Vertexrdd中。如果X包含在Vertexrdd中,我希望结果简单地是null,因为这两条消息均为null

但是,我得到的是一个空的RDD。有人可以帮我理解为什么我要取得空洞的结果?

好吧,看来此问题的原因确实是我的vertexrdd中没有X。我猜想,即使我的edgerdd中有边缘往返和从该顶点,我的聚合仅依赖于边缘属性,算法也无法发送这些消息。

相关内容

  • 没有找到相关文章

最新更新