按值 PySpark 过滤 RDD



我正在使用PySpark,我正在寻找一种方法来检查:

对于给定的check_number = 01

如果 Myrdd1中第三个元素的值不包含 check_number ==>从rdd2获取有关此check_number的所有信息..

鉴于:

rdd1 = sc.parallelize([(u'_guid_F361IeVTC8Q0kckDRw7iOJCe64ELpRmMKQgESgf-uEE=',
u'serviceXXX',
u'testAB_02',
u'2016-07-03')])

假设第一个元素是ID,第二个是服务名称,第三个是测试名称,带有ID,第四个元素是日期。

rdd2 = sc.parallelize([(u'9b023b8233c242c09b93506942002e0a',
u'01',
u'2016-11-02'),
(u'XXXX52547412558933nnBlmquhdyhM',
u'02',
u'2016-11-04')])

假设第一个元素是 ID,第二个是测试 ID,最后一个元素是日期。

所以,这里我在我的rdd1testAB_02中,它与我的check_number不匹配(所以服务名称必须以check_number的值结尾)。我的目标是从rdd2获取所有行,01作为测试 id。此处的预期输出必须是:

[(u'9b023b8233c242c09b93506942002e0a',
u'01',
u'2016-11-02')

这是我的代码:

def update_typesdecohorte_table(rdd1, rdd2):
if rdd1.filter(lambda x : (re.match('.*?' + check_number, x[2]))).isEmpty() is True:
new_rdd2 = rdd2.filter(lambda x : x[1] == check_number)
else:
pass
return new_rdd2
new_rdd2 = update_typesdecohorte_table(rdd1, rdd2)

威奇给出:

[(u'9b023b8233c242c09b93506942002e0a', u'01', u'2016-11-02')]

这段代码有效,但我不喜欢这种方法。最有效的方法是什么?

如果你想从rdd2中获取rdd1中没有匹配元素的所有记录,你可以使用cartesian

new_rdd2 = rdd1.cartesian(rdd2)
.filter(lambda r: not r[0][2].endswith(r[1][1]))
.map(lambda r: r[1])

如果check_number是固定的,请在末尾按以下值进行筛选:

new_rdd2.filter(lambda r: r[1] == check_number).collect()

但是,如果您的check_number是固定的并且两个RDD都很大,那么它甚至比您的解决方案还要慢,因为它需要在连接期间对分区进行洗牌(您的代码仅执行非随机转换)。

相关内容

  • 没有找到相关文章

最新更新