构建大规模层次数据树路径的有效方法



我有一个由网络元素组成的大型数据集(想想:大数据(,它们形成了一个树状网络。

玩具数据集如下所示:

|   id | type   | parent_id   |
|-----:|:-------|:------------|
|    1 | D      | <NA>        |
|    2 | C      | 1           |
|    3 | C      | 2           |
|    4 | C      | 3           |
|    5 | B      | 3           |
|    6 | B      | 4           |
|    7 | A      | 4           |
|    8 | A      | 5           |
|    9 | A      | 3           |

重要规则:

  • 根节点(在D类型的玩具示例中(和叶节点(在A类型的玩具实例中(不能相互连接,也不能相互连接。即,D节点不能与另一个D节点连接(a节点反之亦然(,a节点不能直接与D节点连接
  • 出于简单的原因,任何其他节点类型都可以根据类型随机连接
  • 树的深度可以是任意深度
  • 叶节点总是类型A
  • 叶节点不需要通过所有中间节点进行连接。事实上,只有少数中介节点是必须通过的。这个例子可以忽略这种情况
  • 如果您要建议在Spark中执行此操作,那么编写解决方案时必须考虑到pyspark

我想实现的是建立一种有效的方法(最好是在Spark中(来计算每个节点的树路径,如下所示:

|   id | type   | parent_id   | path                |
|-----:|:-------|:------------|:--------------------|
|    1 | D      | <NA>        | D:1                 |
|    2 | C      | 1           | D:1>C:2             |
|    3 | C      | 2           | D:1>C:2>C:3         |
|    4 | C      | 3           | D:1>C:2>C:3>C:4     |
|    5 | B      | 3           | D:1>C:2>C:3>B:5     |
|    6 | B      | 4           | D:1>C:2>C:3>C:4>B:6 |
|    7 | A      | 4           | D:1>C:2>C:3>C:4>A:7 |
|    8 | A      | 5           | D:1>C:2>C:3>B:5>A:8 |
|    9 | A      | 3           | D:1>C:2>C:3>A:9     |

注意

树路径中的每个元素都是这样构造的:id:type

如果你有其他有效的方法来存储树路径(例如闭包表(并计算它们,我也很高兴听到它们。然而,计算的运行时间必须非常低(少于一小时,最好是几分钟(,并且稍后的检索需要在几秒钟内。

最终目标是拥有一个数据结构,使我能够高效地聚合某个节点下的任何网络节点(最多运行几秒钟(。

由大约3M个节点组成的实际数据集可以这样构建:

注意

  • 生成上面显示的玩具示例的注释node_counts
  • 节点元素的分布接近实际
import random
import pandas as pd
random.seed(1337)
node_counts = {'A': 1424383, 'B': 596994, 'C': 234745, 'D': 230937, 'E': 210663, 'F': 122859, 'G': 119453, 'H': 57462, 'I': 23260, 'J': 15008, 'K': 10666, 'L': 6943, 'M': 6724, 'N': 2371, 'O': 2005, 'P': 385}
#node_counts = {'A': 3, 'B': 2, 'C': 3, 'D': 1}
elements = list()
candidates = list()
root_type = list(node_counts.keys())[-1]
leaf_type = list(node_counts.keys())[0]
root_counts = node_counts[root_type]
leaves_count = node_counts[leaf_type]
ids = [i + 1 for i in range(sum(node_counts.values()))]
idcounter = 0
for i, (name, count) in enumerate(sorted(node_counts.items(), reverse=True)):
for _ in range(count):
_id = ids[idcounter]
idcounter += 1
_type = name
if i == 0:
_parent = None
else:
# select a random one that is not a root or a leaf
if len(candidates) == 0: # first bootstrap case
candidate = random.choice(elements)
else:
candidate = random.choice(candidates)
_parent = candidate['id']
_obj = {'id': _id, 'type': _type, 'parent_id': _parent}
#print(_obj)
elements.append(_obj)
if _type != root_type and _type != leaf_type:
candidates.append(_obj)
df = pd.DataFrame.from_dict(elements).astype({'parent_id': 'Int64'})

为了使用上述玩具数据在纯python中生成树路径,您可以使用以下函数:

def get_hierarchy_path(df, cache_dict, ID='id', LABEL = 'type', PARENT_ID = 'parent_id', node_sep='|', elem_sep=':'):
def get_path(record):
if pd.isna(record[PARENT_ID]):
return f'{record[LABEL]}{elem_sep}{record[ID]}'
else:
if record[PARENT_ID] in cache_dict:
parent_path = cache_dict[record[PARENT_ID]]
else:
try:
parent_path = get_path(df.query(f'{ID} == {record[PARENT_ID]}').iloc[0])
except IndexError as e:
print(f'Index Miss for {record[PARENT_ID]} on record {record.to_dict()}')
parent_path = f'{record[LABEL]}{elem_sep}{record[ID]}'
cache_dict[record[PARENT_ID]] = parent_path
return f"{parent_path}{node_sep}{record[LABEL]}{elem_sep}{record[ID]}"
return df.apply(get_path, axis=1)
df['path'] = get_hierarchy_path(df, dict(), node_sep='>')

我已经尝试过的:

  • 在大型数据集上使用上述函数在纯python中计算大约需要5.5小时。所以这并不是一个真正的解决方案。任何比这更快的都将不胜感激
  • 从技术上讲,使用sparkgraphframes软件包,我可以使用BFS。这将为我提供一个针对单个离开节点的良好解决方案,但它不能扩展到整个网络
  • 我认为Pregel是来这里的路。但我不知道如何在Pyspark中构建它

感谢您的帮助。

我目前针对这一挑战的解决方案现在不再依赖Spark,而是依赖SQL。我将整个数据集加载到Postgres数据库中,并在id、type和parent_id上放置一个Unique Index。

然后使用以下查询,我可以计算路径:

with recursive recursive_hierarchy AS (
-- starting point
select 
parent_id
, id 
, type 
, type || ':' || id as path
, 1 as lvl
from hierarchy.nodes
union all
-- recursion
select
ne.parent_id as parent_id
, h.id 
, h.type 
, ne.type || ':' || ne.id || '|' || h.path as path
, h.lvl + 1 as lvl
from (
select * 
from hierarchy.nodes
) ne
inner join recursive_hierarchy h
on ne.id = h.parent_id
), paths as (
-- complete results
select 
*
from recursive_hierarchy
), max_lvl as (
-- retrieve the longest path of a network element
select
id
, max(lvl) as max_lvl
from paths
group by id
)
-- all results with only the longest path of a network element
select distinct
, p.id
, p.type
, p.path
from paths p
inner join max_lvl l
on p.id = l.id
and p.lvl = l.max_lvl

最新更新