我只是从GraphFrames开始,尽管我关注文档,但我无法从gentRegateMessages函数中获得任何结果(它返回一个空数据库)。这是我问题的简化示例:i GraphFrames对象称为 testGraph
,使我的vertexrdd仅由一个没有顶点属性的单个顶点Y
组成,而我的edgerdd由两个类似的记录组成:
| src | dst | min_ts1 | min_ts2 |
| X | Y | 20 | null |
| Y | X | null | -10 |
现在,我想实现一种简单的算法,该算法将min_ts1
的值发送到dst
,然后将min_ts2
发送到src
。我用来实现此算法的代码是:
import org.graphframes.lib.AggregateMessages
import org.apache.spark.sql.functions._
val AM = AggregateMessages
val msgToSrc = AM.edge("min_ts2)
val msgToDst = AM.edge("min_ts1")
val delay = testGraph
.aggregateMessages
.sendToSrc(msgToSrc)
.sendToDst(msgToDst)
.agg(sum(AM.msg).as("avg_time_delay"))
我意识到这里有一些无效的值,但是无论我希望传递算法的消息会执行以下操作:查看第一个记录,并将20
的消息发送到Y
,以及null
的消息到X
。然后查看第二个记录,然后将null
的消息发送到X,以及-10
的消息到Y
。最后,我希望结果表明,Y
的消息总和是10
,并且结果中没有X
的记录,因为它未包含在Vertexrdd中。如果X
包含在Vertexrdd中,我希望结果简单地是null
,因为这两条消息均为null
。
但是,我得到的是一个空的RDD。有人可以帮我理解为什么我要取得空洞的结果?
好吧,看来此问题的原因确实是我的vertexrdd中没有X
。我猜想,即使我的edgerdd中有边缘往返和从该顶点,我的聚合仅依赖于边缘属性,算法也无法发送这些消息。