我用一些随机数学值填充RDD:
val itemFactors = rddItems.mapValues(newFactors =>
Vector(Array.fill(2){math.random})
)
然后,我将该RDD加入其他RDD并缓存它:
val finalRDD = itemFactors.join(rddItemsUsers).map{
case(itemid, (itemVector, ((userid, rating), userVector))) =>
(itemid, itemVector, userid, userVector, rating)}.cache
然后,我对finalRDD
:中保存的数据进行计算
sqrt(finalRDD.aggregate(0.0)((accum, item) =>
accum + pow(item._5 - item._4.dot(item._2), 2), _ + _) / finalRDD.count)
我从控制台反复调用代码的最后一部分sqrt(...)
,每次都会得到不同的结果——这是不需要的,因为我没有更改任何内容!这可以通过两种方式进行补救(即,使我得到一致的结果):
我可以用一个固定的数字填充数组,而不是用
math.random
初始化itemFactors
,例如1.0我可以做
itemFactors.cache
。
现在,我知道,由于沿袭,每次调用itemFactors
时,它都会调用math.random
并创建一个新的数字——因此,这将影响我执行计算时的情况。这就是为什么在填充数组时使用固定数字会产生一致的结果。
但是,最大的问题和我不理解的一点是:我正在缓存finalRDD
,这是执行计算的对象,因为它由itemFactors
组成,所以填充什么itemFactor
的数组当然无关紧要,因为节点只访问过一次?我以为我开始了解这个血统了;然而,这让我大吃一惊。
如果您的缓存无法放入内存,则会根据LRU策略丢失。
为了避免这种情况,你可以使用持久化,它接受所示的参数
val result = input.map(x => x*x)
result.persist(MEMORY_ONLY)
MEMORY_ONLY-将RDD作为反序列化的Java对象存储在JVM中。如果RDD不适合内存,一些分区将不会被缓存,每次需要时都会动态重新计算。这是默认级别。
MEMORY_AND_DISK将RDD作为反序列化的Java对象存储在JVM中。如果RDD不适合内存,请将不适合磁盘的分区存储起来,并在需要时从中读取。
MEMORY_ONLY_SER将RDD存储为序列化的Java对象(每个分区一个字节数组)。这通常比反序列化的对象更节省空间,尤其是当使用快速序列化程序时,但读取更占用CPU。
MEMORY_AND_DISK_SER类似于MEMORY_ONLY_SER,但会将内存中不适合的分区溢出到磁盘,而不是在每次需要时动态重新计算它们。
DISK_ONLY仅在磁盘上存储RDD分区。MEMORY_ONLY_2、MEMORY_AND_DISK_2等。与上面的级别相同,但在两个集群节点上复制每个分区。
OFF_HEAP(实验)在Tachyon中以串行格式存储RDD。与MEMORY_ONLY_SER相比,OFF_HEAP减少了垃圾收集开销,并允许执行器更小并共享一个内存池,使其在具有大堆或多个并发应用程序的环境中具有吸引力。此外,由于RDD位于Tachyon中,执行器的崩溃不会导致丢失内存中的缓存。在这种模式下,快子中的内存是可丢弃的。因此,Tachyon不会试图重建它从内存中逐出的块。
有关的更多文档,请参阅此链接