pyspark mapPartitions函数是如何工作的



所以我正在尝试使用Python(Pyspark)学习Spark。我想知道函数mapPartitions是如何工作的。这就是它所接受的输入和它所给予的输出。我在网上找不到任何合适的例子。比方说,我有一个包含列表的RDD对象,如下面所示。

[ [1, 2, 3], [3, 2, 4], [5, 2, 7] ] 

我想从所有列表中删除元素2,如何使用mapPartitions来实现这一点。

mapPartition应该被认为是分区上的映射操作,而不是分区元素上的映射。它的输入是一组当前分区,它的输出将是另一组分区。

传递给map操作的函数必须采用RDD 的单个元素

传递给mapPartition的函数必须采用RDD类型的可迭代函数,并返回其他类型或相同类型的可遍历函数。

在你的情况下,你可能只想做一些类似的事情:

def filter_out_2(line):
    return [x for x in line if x != 2]
filtered_lists = data.map(filterOut2)

如果你想使用mapPartition,它将是:

def filter_out_2_from_partition(list_of_lists):
  final_iterator = []
  for sub_list in list_of_lists:
    final_iterator.append( [x for x in sub_list if x != 2])
  return iter(final_iterator)
filtered_lists = data.mapPartition(filterOut2FromPartion)

使用yield语法将mapPartitions与生成器函数一起使用更容易:

def filter_out_2(partition):
    for element in partition:
        if element != 2:
            yield element
filtered_lists = data.mapPartitions(filter_out_2)

需要最终迭代

def filter_out_2(partition):
for element in partition:
    sec_iterator = []
    for i in element:
        if i!= 2:
            sec_iterator.append(i)
    yield sec_iterator
filtered_lists = data.mapPartitions(filter_out_2)
for i in filtered_lists.collect(): print(i)
     def func(l):
         for i in l:
             yield i+"ajbf"

     mylist=['madhu','sdgs','sjhf','mad']
     rdd=sc.parallelize(mylist)
     t=rdd.mapPartitions(func)
     for i in t.collect():
         print(i)
     for i in t.collect():
        print(i)

在上面的代码中,我可以从第二个获得…的数据。。循环中。。作为每个生成器,它不应该在循环上迭代一次就取值

最新更新