根据第二个rdd的值筛选rdd



我有两个rdd,我想用另一个的值来过滤其中一个。

每个rdd的一些实例如下:

rdd1 = [((address1, date1),1), ((address5, date2),1), ((address1, date2),1), ((address2,date3),1)]
rdd2 = [(address1,1), (address1,1), (address2, 1), (address1, 1)]

所需输出为:

joined_rdd = [((address1, date1),1),((address1, date2),1),((address2,date3),1)]

因此,基本上,如果rdd2中存在元组中的地址值,我希望将该元组保留在rdd1中。

执行联接并丢弃rdd2:中的所有内容

rdd1 = sc.parallelize([(('address1', 'date1'),1), (('address5', 'date2'),1), (('address1', 'date2'),1), (('address2','date3'),1)])
rdd2 = sc.parallelize([('address1',1), ('address1',1), ('address2', 1), ('address1', 1)])
result_rdd = (rdd1.keyBy(lambda x: x[0][0])
.join(rdd2.map(lambda x: x[0])
.keyBy(lambda x: x)
.distinct())
.map(lambda x: x[1][0]))
result_rdd.collect()
[(('address2', 'date3'), 1), (('address1', 'date1'), 1), (('address1', 'date2'), 1)]

最新更新