我有 2 个 rdd,一个作为字典列表,第二个作为元组列表,如下所示 -
RDD1 = [{'id1', ['string', 'string', count]}, {'id2', ['string', 'string', count]}, {'id3', ['string', 'string', count]}]RDD2 = [(ID1, count(, (id2, count(, (id3, count(]
现在我想将 rdd2 的计数添加到 rdd1 如果来自 rdd2 的 id 与 rdd1 匹配。你能帮我做到这一点吗?
提前谢谢你。
尽管盖茨的答案是正确的,但在使用 RDD 时,您应该尽量避免使用 for 循环。RDD 上的操作是并行的,在处理大数据集时,与 for 循环相比要快得多。您可以通过连接两个 RDD 并重新格式化输出来实现相同的目的:
rdd1 = sc.parallelize([{'id1':['string','string',1]}, {'id2':['string','string',2]}, {'id3':['string','string',3]}])
rdd2 = sc.parallelize([('id1',2), ('id2',4), ('id3',6), ('id4',8)])
rdd_joined = rdd1.flatMap(lambda x:x.items()).join(rdd2)
rdd_reformatted = rdd_joined.map(lambda (x,(y,z)):{x:y[:-1]+[y[-1]+z]})
rdd_reformatted.collect()
给出作为输出:
[{'id2': ['string', 'string', 6]},
{'id3': ['string', 'string', 9]},
{'id1': ['string', 'string', 3]}]
我希望这有所帮助。
rdd1 = [{'id1':['string','string',1]}, {'id2':['string','string',2]}, {'id3':['string','string',3]}]
rdd2 = [('id1',2), ('id2',4), ('id3',6), ('id4',8)]
for each in rdd2:
there = False
position = 0
for ele in rdd1:
if each[0] in ele.keys():
#now increment the count
original = rdd1[position]
originalList = original[each[0]]
#updating the 3rd element
newList = originalList
newList[2] = originalList[2] + each[1]
#update the new list to key
updated = { each[0] : newList }
rdd1[position] = updated
there = True
break
position = position + 1
print rdd1
#output: [{'id1': ['string', 'string', 3]}, {'id2': ['string', 'string', 6]}, {'id3': ['string', 'string', 9]}]