所以我正在尝试使用Python(Pyspark)学习Spark。我想知道函数mapPartitions
是如何工作的。这就是它所接受的输入和它所给予的输出。我在网上找不到任何合适的例子。比方说,我有一个包含列表的RDD对象,如下面所示。
[ [1, 2, 3], [3, 2, 4], [5, 2, 7] ]
我想从所有列表中删除元素2,如何使用mapPartitions
来实现这一点。
mapPartition
应该被认为是分区上的映射操作,而不是分区元素上的映射。它的输入是一组当前分区,它的输出将是另一组分区。
传递给map
操作的函数必须采用RDD 的单个元素
传递给mapPartition
的函数必须采用RDD类型的可迭代函数,并返回其他类型或相同类型的可遍历函数。
在你的情况下,你可能只想做一些类似的事情:
def filter_out_2(line):
return [x for x in line if x != 2]
filtered_lists = data.map(filterOut2)
如果你想使用mapPartition
,它将是:
def filter_out_2_from_partition(list_of_lists):
final_iterator = []
for sub_list in list_of_lists:
final_iterator.append( [x for x in sub_list if x != 2])
return iter(final_iterator)
filtered_lists = data.mapPartition(filterOut2FromPartion)
使用yield
语法将mapPartitions与生成器函数一起使用更容易:
def filter_out_2(partition):
for element in partition:
if element != 2:
yield element
filtered_lists = data.mapPartitions(filter_out_2)
需要最终迭代
def filter_out_2(partition):
for element in partition:
sec_iterator = []
for i in element:
if i!= 2:
sec_iterator.append(i)
yield sec_iterator
filtered_lists = data.mapPartitions(filter_out_2)
for i in filtered_lists.collect(): print(i)
def func(l):
for i in l:
yield i+"ajbf"
mylist=['madhu','sdgs','sjhf','mad']
rdd=sc.parallelize(mylist)
t=rdd.mapPartitions(func)
for i in t.collect():
print(i)
for i in t.collect():
print(i)
在上面的代码中,我可以从第二个获得…的数据。。循环中。。作为每个生成器,它不应该在循环上迭代一次就取值