Apache Spark RDD缓存和Lineage混淆



我用一些随机数学值填充RDD:

val itemFactors = rddItems.mapValues(newFactors => 
Vector(Array.fill(2){math.random})
)

然后,我将该RDD加入其他RDD并缓存它:

val finalRDD = itemFactors.join(rddItemsUsers).map{
case(itemid, (itemVector, ((userid, rating), userVector))) => 
(itemid, itemVector, userid, userVector, rating)}.cache

然后,我对finalRDD:中保存的数据进行计算

sqrt(finalRDD.aggregate(0.0)((accum, item) => 
accum + pow(item._5 - item._4.dot(item._2), 2), _ + _) / finalRDD.count)

我从控制台反复调用代码的最后一部分sqrt(...),每次都会得到不同的结果——这是不需要的,因为我没有更改任何内容!这可以通过两种方式进行补救(即,使我得到一致的结果):

  • 我可以用一个固定的数字填充数组,而不是用math.random初始化itemFactors,例如1.0

  • 我可以做itemFactors.cache

现在,我知道,由于沿袭,每次调用itemFactors时,它都会调用math.random并创建一个新的数字——因此,这将影响我执行计算时的情况。这就是为什么在填充数组时使用固定数字会产生一致的结果。

但是,最大的问题和我不理解的一点是:我正在缓存finalRDD,这是执行计算的对象,因为它由itemFactors组成,所以填充什么itemFactor的数组当然无关紧要,因为节点只访问过一次?我以为我开始了解这个血统了;然而,这让我大吃一惊。

如果您的缓存无法放入内存,则会根据LRU策略丢失。

为了避免这种情况,你可以使用持久化,它接受所示的参数

val result = input.map(x => x*x)
result.persist(MEMORY_ONLY)

MEMORY_ONLY-将RDD作为反序列化的Java对象存储在JVM中。如果RDD不适合内存,一些分区将不会被缓存,每次需要时都会动态重新计算。这是默认级别。

MEMORY_AND_DISK将RDD作为反序列化的Java对象存储在JVM中。如果RDD不适合内存,请将不适合磁盘的分区存储起来,并在需要时从中读取。

MEMORY_ONLY_SER将RDD存储为序列化的Java对象(每个分区一个字节数组)。这通常比反序列化的对象更节省空间,尤其是当使用快速序列化程序时,但读取更占用CPU。

MEMORY_AND_DISK_SER类似于MEMORY_ONLY_SER,但会将内存中不适合的分区溢出到磁盘,而不是在每次需要时动态重新计算它们。

DISK_ONLY仅在磁盘上存储RDD分区。MEMORY_ONLY_2、MEMORY_AND_DISK_2等。与上面的级别相同,但在两个集群节点上复制每个分区。

OFF_HEAP(实验)在Tachyon中以串行格式存储RDD。与MEMORY_ONLY_SER相比,OFF_HEAP减少了垃圾收集开销,并允许执行器更小并共享一个内存池,使其在具有大堆或多个并发应用程序的环境中具有吸引力。此外,由于RDD位于Tachyon中,执行器的崩溃不会导致丢失内存中的缓存。在这种模式下,快子中的内存是可丢弃的。因此,Tachyon不会试图重建它从内存中逐出的块。

有关的更多文档,请参阅此链接

最新更新