我正在使用Spark Graphframes库来创建身份解析系统。我已经能够使用火花来找到匹配项。我的计划是使用图表来查找人与人之间的瞬态联系,并为其分配一个 ID 以进行进一步分析等。
我使用了以下数据(来自公共febrl数据库):
顶点数据示例:
+----------+--------+-------------+-------------------+--------------------+----------------+--------+-----+-------------+----------+---+-----+
|given_name| surname|street_number| address_1| address_2| suburb|postcode|state|date_of_birth|soc_sec_id| id|block|
+----------+--------+-------------+-------------------+--------------------+----------------+--------+-----+-------------+----------+---+-----+
| michaela| neumann| 8| stanley street| miami| winston hills| 4223| nsw| 19151111| 5304218| 0| mneu|
| courtney| painter| 12| pinkerton circuit| bega flats| richlands| 4560| vic| 19161214| 4066625| 1| cpai|
| charles| green| 38|salkauskas crescent| kela| dapto| 4566| nsw| 19480930| 4365168| 2| cgre|
| vanessa| parr| 905| macquoid place| broadbridge manor| south grafton| 2135| sa| 19951119| 9239102| 3| vpar|
| mikayla|malloney| 37| randwick road| avalind|hoppers crossing| 4552| vic| 19860208| 7207688| 4| mmal|
| blake| howie| 1| cutlack street|belmont park belt...| budgewoi| 6017| vic| 19250301| 5180548| 5| bhow|
| blakeston| broadby| 53| traeger street| valley of springs| north ward| 3083| qld| 19120907| 4308555| 7| bbro|
| edward| denholm| 10| corin place| gold tyne| clayfield| 4221| vic| 19660306| 7119771| 9| eden|
| charlie|alderson| 266|hawkesbury crescent|deergarden caravn...| cooma| 4128| vic| 19440908| 1256748| 10| cald|
| molly| roche| 59|willoughby crescent| donna valley| carrara| 4825| nsw| 19200712| 1847058| 11| mroc|
+----------+--------+-------------+-------------------+--------------------+----------------+--------+-----+-------------+----------+---+-----+
边缘数据示例:
+---+-----+-----+
|src| dst|match|
+---+-----+-----+
| 0|10000| 1|
| 1|17750| 1|
| 1|10001| 1|
| 1| 7750| 1|
| 2|19656| 1|
| 2|10002| 1|
| 2| 9656| 1|
| 3|19119| 1|
| 3|10003| 1|
| 3| 9119| 1|
+---+-----+-----+
创建的图形:
g = GraphFrame(vertix_data, edge_data)
使用的连接组件:
connected = g.connectedComponents(algorithm='graphframes')
这导致:
+----------+--------+-------------+-------------------+--------------------+----------------+--------+-----+-------------+----------+---+-----+---------+
|given_name| surname|street_number| address_1| address_2| suburb|postcode|state|date_of_birth|soc_sec_id| id|block|component|
+----------+--------+-------------+-------------------+--------------------+----------------+--------+-----+-------------+----------+---+-----+---------+
| michaela| neumann| 8| stanley street| miami| winston hills| 4223| nsw| 19151111| 5304218| 0| mneu| 0|
| courtney| painter| 12| pinkerton circuit| bega flats| richlands| 4560| vic| 19161214| 4066625| 1| cpai| 1|
| charles| green| 38|salkauskas crescent| kela| dapto| 4566| nsw| 19480930| 4365168| 2| cgre| 2|
| vanessa| parr| 905| macquoid place| broadbridge manor| south grafton| 2135| sa| 19951119| 9239102| 3| vpar| 3|
| mikayla|malloney| 37| randwick road| avalind|hoppers crossing| 4552| vic| 19860208| 7207688| 4| mmal| 4|
| blake| howie| 1| cutlack street|belmont park belt...| budgewoi| 6017| vic| 19250301| 5180548| 5| bhow| 5|
| blakeston| broadby| 53| traeger street| valley of springs| north ward| 3083| qld| 19120907| 4308555| 7| bbro| 7|
| edward| denholm| 10| corin place| gold tyne| clayfield| 4221| vic| 19660306| 7119771| 9| eden| 9|
| charlie|alderson| 266|hawkesbury crescent|deergarden caravn...| cooma| 4128| vic| 19440908| 1256748| 10| cald| 10|
| molly| roche| 59|willoughby crescent| donna valley| carrara| 4825| nsw| 19200712| 1847058| 11| mroc| 11|
+----------+--------+-------------+-------------------+--------------------+----------------+--------+-----+-------------+----------+---+-----+---------+
组件列并不总是以 1 为增量增加,但似乎随机跳过数字,我想确保以 1 为增量的增加,因为使用此数字为每个人分配一个 id。 有人知道为什么Graphframes会这样做吗?
当我进一步研究这个问题时,对于我的开发数据帧中的大约 20,000 行,大约 17% 的条目在其中跳过。在极端情况下,差距可能高达 20-30 左右,即一行 id 是 5846,下一行是 5868。我担心的是,当我扩展到数百万和数亿时,id之间的差距会变得非常大,这可能会产生问题。
TL;DR:为什么Sparks连接的组件似乎随机跳过值,而不是总是递增1?
Graphframe 文档从不承诺连续的 id,相反,它提供的唯一保证是:
生成的数据帧包含所有顶点信息和一列附加列:
组件(长类型):此组件的唯一 ID
在实践中,GraphX 实现使用组件中的最小 ID(">返回一个顶点值的图形,其中包含包含该顶点的连接组件中的最低顶点 ID"),并且 Graphframe 似乎也做了同样的事情。
就像@user10802135说的,组件值不能保证是连续的。 如果要使它们按顺序排列,则需要对组件字段进行一些后处理。 对此的 pyspark 解决方案如下所示:
import pyspark.sql.functions as F
from pyspark.sql import Window
# Define our window for partitioning data on - necessary for dense_rank() function
windowSpec = Window.partitionBy(F.lit(1)).orderBy('component')
# Redefine the component field, now in sequential order
df = df.withColumn('component', F.dense_rank().over(windowSpec))
通过按文字值 1 进行分区,所有行都被视为dense_rank()
,排名顺序由.orderBy()
参数确定。 在这种情况下,.orderBy()
参数设置为'component'
,默认情况下将按升序排序。.dense_rank()
功能可确保同一组件下的记录将被赋予相同的返回值,这是使用rank()
无法确保的。
这里有一些关于.dense_rank()
和其他窗口函数的很好的例子和解释。