如何在pyspark中使用所有可能的元素组合创建新的Rdd



嗨,我在下面创建了一个Rdd

rdd1=sc.parallelize(['P','T','K'])
rdd1.collect()
['P', 'T', 'K']

现在,我想创建新的RDD2,其中包含所有可能的组合,如下面所示,以及新的RDD。即,除了相同的元素组合,如(p,p(、(k,k(、(t,t(。

我在做时的预期输出

RDD2.collect()
[
('P'),('T'),('K'),
('P','T'),('P','K'),('T','K'),('T','P'),('K','P'),('K','T'),
('P','T','K'),('P','K','T'),('T','P','K'),('T','K','P'),('K','P','T'),('K','T','P')
]

您似乎想要生成rdd中元素的所有排列,其中每一行都包含唯一的值。

一种方法是首先创建一个辅助函数来生成所需的长度n:的组合

from functools import reduce
from itertools import chain
def combinations_of_length_n(rdd, n):
# for n > 0
return reduce(
lambda a, b: a.cartesian(b).map(lambda x: tuple(chain.from_iterable(x))),
[rdd]*n
).filter(lambda x: len(set(x))==n)

从本质上讲,该函数将对自己进行rddn笛卡尔乘积,并只保留所有值不同的行。

我们可以对n = [2, 3]:进行测试

print(combinations_of_length_n(rdd1, n=2).collect())
#[('P', 'T'), ('P', 'K'), ('T', 'P'), ('K', 'P'), ('T', 'K'), ('K', 'T')]
print(combinations_of_length_n(rdd1, n=3).collect())
#[('P', 'T', 'K'),
# ('P', 'K', 'T'),
# ('T', 'P', 'K'),
# ('K', 'P', 'T'),
# ('T', 'K', 'P'),
# ('K', 'T', 'P')]

您想要的最终输出只是这些中间结果中的union和原始rdd(值映射到tuples(。

rdd1.map(lambda x: tuple((x,)))
.union(combinations_of_length_n(rdd1, 2))
.union(combinations_of_length_n(rdd1, 3)).collect()
#[('P',),
# ('T',),
# ('K',),
# ('P', 'T'),
# ('P', 'K'),
# ('T', 'P'),
# ('K', 'P'),
# ('T', 'K'),
# ('K', 'T'),
# ('P', 'T', 'K'),
# ('P', 'K', 'T'),
# ('T', 'P', 'K'),
# ('K', 'P', 'T'),
# ('T', 'K', 'P'),
# ('K', 'T', 'P')]

为了推广任何最大重复次数:

num_reps = 3
reduce(
lambda a, b: a.union(b),
[
combinations_of_length_n(rdd1.map(lambda x: tuple((x,))), i+1) 
for i in range(num_reps)
]
).collect()
#Same as above

注意:笛卡尔乘积是昂贵的运算,应尽可能避免。

有几种方法。您可以运行一个循环,获取排列并将它们存储在列表中,然后将列表转换为rdd-

>>> rdd1.collect()
['P', 'T', 'K']
>>> 
>>> l = []
>>> for i in range(2,rdd1.count()+1):
...     x = list(itertools.permutations(rdd1.toLocalIterator(),i))
...     l = l+x
... 
>>> rdd2 = sc.parallelize(l)
>>> 
>>> rdd2.collect()
[('P', 'T'), ('P', 'K'), ('T', 'P'), ('T', 'K'), ('K', 'P'), ('K', 'T'), ('P', 'T', 'K'), ('P', 'K', 'T'), ('T', 'P', 'K'), ('T', 'K', 'P'), ('K', 'P', 'T'), ('K', 'T', 'P')]

最新更新