使用 Spark 计算节点之间的链接



我在Spark 2.2和Scala 2.11中有以下两个数据帧。数据帧edges定义有向图的边缘,而数据帧types定义每个节点的类型。

edges =
+-----+-----+----+
|from |to   |attr|
+-----+-----+----+
|    1|    0|   1|
|    1|    4|   1|
|    2|    2|   1|
|    4|    3|   1|
|    4|    5|   1|
+-----+-----+----+
types =
+------+---------+
|nodeId|type     |
+------+---------+
|     0|        0|
|     1|        0|
|     2|        2|
|     3|        4|
|     4|        4|
|     5|        4|
+------+---------+

对于每个节点,我想知道同一type节点的边数。请注意,我只想计算从节点传出的边,因为我处理的是有向图。

为了达到此目标,我执行了两个数据帧的联接:

val graphDF = edges
.join(types, types("nodeId") === edges("from"), "left")
.drop("nodeId")
.withColumnRenamed("type","type_from")
.join(types, types("nodeId") === edges("to"), "left")
.drop("nodeId")
.withColumnRenamed("type","type_to")

我获得了以下新的数据帧graphDF

+-----+-----+----+---------------+---------------+
|from |to   |attr|type_from      |type_to        |
+-----+-----+----+---------------+---------------+
|    1|    0|   1|              0|              0|
|    1|    4|   1|              0|              4|
|    2|    2|   1|              2|              2|
|    4|    3|   1|              4|              4|
|    4|    5|   1|              4|              4|
+-----+-----+----+---------------+---------------+

现在我需要得到以下最终结果:

+------+---------+---------+
|nodeId|numLinks |type     |
+------+---------+---------+
|     0|        0|        0| 
|     1|        1|        0|
|     2|        0|        2|
|     3|        0|        4|
|     4|        2|        4|
|     5|        0|        4| 
+------+---------+---------+

我正在考虑使用groupByagg(count(...),但我不知道如何处理有向边。

更新:

numLinks计算为从给定节点传出的边数。例如,节点 5 没有任何传出边(只有传入边4->5,请参阅数据帧edges)。同样是指节点 0。但是节点 4 有两个传出边(4->34->5)。

我的解决方案:

这是我的解决方案,但它缺少那些具有 0 个链接的节点。

graphDF.filter("from != to").filter("type_from == type_to").groupBy("from").agg(count("from") as "numLinks").show()

您可以使用类型进行筛选、按 id 和类型聚合,并添加缺少的节点:

val graphDF = Seq(
(1, 0, 1, 0, 0), (1, 4, 1, 0, 4), (2, 2, 1, 2, 2),
(4, 3, 1, 4, 4), (4, 5, 1, 4, 4)
).toDF("from", "to", "attr", "type_from", "type_to")
val types = Seq(
(0, 0), (1, 0), (2, 2), (3, 4), (4,4), (5, 4)
).toDF("nodeId", "type")
graphDF
// I want to know the number of edges to the nodes of the same type
.where($"type_from" === $"type_to" && $"from" =!= $"to")
// I only want to count the edges outgoing from a node,
.groupBy($"from" as "nodeId", $"type_from" as "type")
.agg(count("*") as "numLinks")
// but it lacks those nodes that have 0 links.
.join(types, Seq("nodeId", "type"), "rightouter")
.na.fill(0)
// +------+----+--------+
// |nodeId|type|numLinks|
// +------+----+--------+
// |     0|   0|       0|
// |     1|   0|       1|
// |     2|   2|       1|
// |     3|   4|       0|
// |     4|   4|       2|
// |     5|   4|       0|
// +------+----+--------+

要跳过自链接,请将$"from" =!= $"to"添加到选择中:

graphDF
.where($"type_from" === $"type_to" && $"from" =!= $"to")
.groupBy($"from" as "nodeId", $"type_from" as "type")
.agg(count("*") as "numLinks")
.join(types, Seq("nodeId", "type"), "rightouter")
.na.fill(0)
// +------+----+--------+
// |nodeId|type|numLinks|
// +------+----+--------+
// |     0|   0|       0|
// |     1|   0|       1|
// |     2|   2|       0|
// |     3|   4|       0|
// |     4|   4|       2|
// |     5|   4|       0|
// +------+----+--------+

相关内容

  • 没有找到相关文章

最新更新