我正在将社交网络的json文件读取到Spark中。我从中得到一个数据框,我分解得到对。 这个过程非常完美。稍后我想将其转换为RDD(用于GraphX),但是RDD创建需要很长时间。
val social_network = spark.read.json(my/path) // 200MB
val exploded_network = social_network.
withColumn("follower", explode($"followers")).
withColumn("id_follower", ($"follower").cast("long")).
withColumn("id_account", ($"account").cast("long")).
withColumn("relationship", lit(1)).
select("id_follower", "id_account", "relationship")
val E1 = exploded_network.as[(VertexId, VertexId, Int)]
val E2 = E1.rdd
为了检查流程的运行方式,我在每个步骤都计数
scala> exploded_network.count
res0: Long = 18205814 // 3 seconds
scala> E1.count
res1: Long = 18205814 // 3 seconds
scala> E2.count // 5.4 minutes
res2: Long = 18205814
为什么RDD转换需要100倍?
在 Spark 中,数据帧是组织成命名列(表格格式)的分布式数据集合。它在概念上等同于关系数据库中的表或 R/Python 中的数据帧,但具有更丰富的优化。而且由于其表格格式,它具有元数据,允许 Spark 在后台运行大量优化。DataFrame API使用spark的高级优化,如钨执行引擎和催化剂优化器来更好地处理数据。
而在RDD中,RDD不会推断给定数据集的模式,而是要求用户提供任何模式。此外,Rdd不能利用Spark的优化器,如催化剂优化器和钨执行引擎(如上所述)。
因此,DataFrame的性能比RDD好得多。在您的情况下,如果您必须使用 RDD 而不是数据帧,我建议您在转换为 rdd 之前缓存数据帧。这应该可以提高您的rdd性能。
val E1 = exploded_network.cache()
val E2 = E1.rdd
希望这有帮助。