我正在使用从某个数据帧df
创建一个数据样本
rdd = df.limit(10000).rdd
这个操作需要相当长的时间(实际上为什么?它不能在10000行之后缩短吗?),所以我假设我现在有了一个新的RDD。
然而,当我现在处理rdd
时,每次访问它都是不同的行。就好像它会重新采样一样。缓存RDD有点帮助,但这肯定不是节省吗?
背后的原因是什么?
更新:这是Spark 1.5.2 的复制品
from operator import add
from pyspark.sql import Row
rdd=sc.parallelize([Row(i=i) for i in range(1000000)],100)
rdd1=rdd.toDF().limit(1000).rdd
for _ in range(3):
print(rdd1.map(lambda row:row.i).reduce(add))
输出为
499500
19955500
49651500
我很惊讶.rdd
没有修复数据。
编辑:为了表明它比重新执行问题更棘手,这里有一个在Spark 2.0.0.2.5.0 上产生错误结果的单一操作
from pyspark.sql import Row
rdd=sc.parallelize([Row(i=i) for i in range(1000000)],200)
rdd1=rdd.toDF().limit(12345).rdd
rdd2=rdd1.map(lambda x:(x,x))
rdd2.join(rdd2).count()
# result is 10240 despite doing a self-join
基本上,无论何时使用limit
,结果都可能是错误的。我的意思不是"只是众多样本中的一个",而是非常不正确的(因为在这种情况下,结果应该总是12345)。
因为Spark是分布式的,所以通常假设确定性结果是不安全的。您的示例是以DataFrame的"第一个"10000行为例。在这里,"第一"的含义存在歧义(因此也是非决定论)。这将取决于Spark的内部结构。例如,它可能是响应驱动程序的第一个分区。该分区可能会随着网络、数据位置等而变化。
即使缓存了数据,我仍然不会依赖于每次都能取回相同的数据,尽管我当然希望它比从磁盘读取更一致。
Spark是惰性的,所以您采取的每个操作都会重新计算limit()返回的数据。如果底层数据被拆分到多个分区,那么每次对其进行评估时,限制可能是从不同的分区提取(即,如果您的数据存储在10个Parquet文件中,第一个限制调用可能从文件1提取,第二个调用可能从文件7提取,依此类推)。
来自Spark文档:
LIMIT
子句用于约束SELECT
语句返回的行数。通常,此子句与ORDER BY
一起使用,以确保结果具有确定性。
因此,如果希望对.limit()
的调用具有确定性,则需要预先对行进行排序。但是有一个陷阱!如果你按一列排序,而不是每一行都有唯一的值,那么所谓的";捆绑的";行(具有相同排序键值的行)将不具有确定性排序,因此.limit()
可能仍然是不确定性的。
你有两个选择来解决这个问题:
- 请确保在排序调用中包含唯一的行id
例如CCD_ 10
您可以这样定义rowId
:df = df.withColumn('rowId', func.monotonically_increasing_id())
- 如果您在单次运行中只需要确定性结果,您可以简单地缓存限制
df.limit(n).cache()
的结果,这样至少该限制的结果不会因为连续的操作调用而更改,否则会重新计算limit
的结果并使结果混乱