为什么df.limit在Pyspark中不断变化



我正在使用从某个数据帧df创建一个数据样本

rdd = df.limit(10000).rdd

这个操作需要相当长的时间(实际上为什么?它不能在10000行之后缩短吗?),所以我假设我现在有了一个新的RDD。

然而,当我现在处理rdd时,每次访问它都是不同的行。就好像它会重新采样一样。缓存RDD有点帮助,但这肯定不是节省吗?

背后的原因是什么?

更新:这是Spark 1.5.2 的复制品

from operator import add
from pyspark.sql import Row
rdd=sc.parallelize([Row(i=i) for i in range(1000000)],100)
rdd1=rdd.toDF().limit(1000).rdd
for _ in range(3):
    print(rdd1.map(lambda row:row.i).reduce(add))

输出为

499500
19955500
49651500

我很惊讶.rdd没有修复数据。

编辑:为了表明它比重新执行问题更棘手,这里有一个在Spark 2.0.0.2.5.0 上产生错误结果的单一操作

from pyspark.sql import Row
rdd=sc.parallelize([Row(i=i) for i in range(1000000)],200)
rdd1=rdd.toDF().limit(12345).rdd
rdd2=rdd1.map(lambda x:(x,x))
rdd2.join(rdd2).count()
# result is 10240 despite doing a self-join

基本上,无论何时使用limit,结果都可能是错误的。我的意思不是"只是众多样本中的一个",而是非常不正确的(因为在这种情况下,结果应该总是12345)。

因为Spark是分布式的,所以通常假设确定性结果是不安全的。您的示例是以DataFrame的"第一个"10000行为例。在这里,"第一"的含义存在歧义(因此也是非决定论)。这将取决于Spark的内部结构。例如,它可能是响应驱动程序的第一个分区。该分区可能会随着网络、数据位置等而变化。

即使缓存了数据,我仍然不会依赖于每次都能取回相同的数据,尽管我当然希望它比从磁盘读取更一致。

Spark是惰性的,所以您采取的每个操作都会重新计算limit()返回的数据。如果底层数据被拆分到多个分区,那么每次对其进行评估时,限制可能是从不同的分区提取(即,如果您的数据存储在10个Parquet文件中,第一个限制调用可能从文件1提取,第二个调用可能从文件7提取,依此类推)。

来自Spark文档:

LIMIT子句用于约束SELECT语句返回的行数。通常,此子句与ORDER BY一起使用,以确保结果具有确定性。

因此,如果希望对.limit()的调用具有确定性,则需要预先对行进行排序。但是有一个陷阱!如果你按一列排序,而不是每一行都有唯一的值,那么所谓的";捆绑的";行(具有相同排序键值的行)将不具有确定性排序,因此.limit()可能仍然是不确定性的。

你有两个选择来解决这个问题:

  • 请确保在排序调用中包含唯一的行id
    例如CCD_ 10
    您可以这样定义rowId
    df = df.withColumn('rowId', func.monotonically_increasing_id())
  • 如果您在单次运行中只需要确定性结果,您可以简单地缓存限制df.limit(n).cache()的结果,这样至少该限制的结果不会因为连续的操作调用而更改,否则会重新计算limit的结果并使结果混乱

相关内容

  • 没有找到相关文章

最新更新