假设我们有以下 Scala 程序:
val inputRDD = sc.textFile("log.txt")
inputRDD.persist()
val errorsRDD = inputRDD.filter(lambda x: "error" in x)
val warningsRDD = inputRDD.filter(lambda x: "warning" in x)
println("Errors: " + errorsRDD.count() + ", Warnings: " + warningsRDD.count())
我们创建一个简单的RDD,持久化它,在RDD上执行两个转换,最后有一个使用RDD的操作。
调用打印时,将执行转换,每个转换当然是并行的,具体取决于群集管理。
我的主要问题是:这两个动作和转换是并行执行还是按顺序执行?还是errorsRDD.count()
先按顺序执行,然后warningsRDD.count()
?
我也想知道在这个例子中使用 persist 是否有任何意义。
所有标准的RDD方法都是阻塞的(AsyncRDDActions
除外(,因此操作将按顺序进行评估。可以使用非阻塞提交(线程、Futures
(并发执行多个操作,并正确配置应用程序内调度程序或为每个操作显式限制资源。
关于cache
,如果不了解上下文就无法回答。根据群集配置、存储和数据局部性,再次从磁盘加载数据可能会更便宜,尤其是在资源有限且后续操作可能会触发缓存清理程序的情况下。
首先执行errorsRDD.count()
,然后执行warningsRDD.count()
。在这里使用 persist 的要点是,当执行第一个计数时,inputRDD 将在内存中。第二个计数,spark 不需要再次从存储中重新读取文件的"整个"内容,因此此计数的执行时间将比第一个要快得多。