在RDD转换时保留Spark DataFrame列分区



给定一个Spark DataFrame,它看起来像这样:

==================================
| Name | Col1 | Col2 | .. | ColN |
----------------------------------
|    A |    1 |   11 | .. |   21 |
|    A |   31 |   41 | .. |   51 |
|    B |    2 |   12 | .. |   22 |
|    B |   32 |   42 | .. |   52 |
==================================

我想运行逻辑,执行表的一个分区的聚合/计算,对应于一个特定的Name值。上述逻辑要求分区的全部内容——并且该分区——在执行逻辑的节点的内存中具体化;它看起来就像下面的processSegment函数:

def processDataMatrix(dataMatrix):
    # do some number crunching on a 2-D matrix
def processSegment(dataIter):
    # "running" value of the Name column in the iterator
    dataName = None
    # as the iterator is processed, put the data in a matrix
    dataMatrix = []
    for dataTuple in dataIter:
        # separate the name column from the other columns
        (name, *values) = dataTuple
        # SANITY CHECK: ensure that all rows have same name
        if (dataName is None):
            dataName = name
        else:
            assert (dataName == name), 'row name ' + str(name) + ' does not match expected ' + str(dataName)
        # put the row in the matrix
        dataMatrix.append(values)
    # if any rows were processed, number-crunch the matrix
    if (dataName is not None):
        return processDataMatrix(dataMatrix)
    else:
        return []

我尝试通过基于Name列重新分区,然后通过底层RDD上的mapPartitions在每个分区上运行processSegment来实现此工作:

result = 
    stacksDF 
        .repartition('Name') 
        .rdd 
        .mapPartitions(processSegment) 
        .collect()

然而,processSegment中的SANITY CHECK断言通常会失败:

AssertionError: row name Q7 does not match expected A9

为什么当我试图在底层RDD上运行mapPartitions时,表面上在DataFrame上执行的分区没有被保留?如果上面的方法无效,是否有一些方法(使用DataFrame API或RDD API)使我能够在DataFrame分区的内存再现上执行聚合逻辑?

(由于我使用的是PySpark,而我希望执行的特定数字运算逻辑是Python,因此用户定义的聚合函数(UDAFs)似乎不是一个选项。)

我相信您误解了分区的工作原理。一般来说,分配器是一个满射函数,而不是双射函数。虽然一个特定值的所有记录将被移动到单个分区,但分区可能包含多个不同值的记录。

DataFrame API不给你任何分区控制,但是可以在使用RDD API时定义自定义partitionFunc。这意味着你可以使用一个双目标,例如:

mapping = (df
    .select("Name")
    .distinct()
    .rdd.flatMap(lambda x: x)
    .zipWithIndex()
    .collectAsMap())
def partitioner(x):
    return mapping[x]

并按如下方式使用:

df.rdd.map(lambda row: (row.Name, row)).partitionBy(len(mapping), partitioner)

尽管您必须记住分区不是空闲的,如果唯一值的数量很大,它可能会成为一个严重的性能问题。

相关内容

  • 没有找到相关文章