PySpark .groupBy() 和 .count() 在相对较小的数据帧上很慢



好的 让我首先描述一下我是如何创建Dataframe以及其中的内容。

我有一组 gzip 的 HTML 文档和一组 gzip 的元数据到这些 HTML 文档

对于两者,我提供了如下所示的RDDs路径列表:

Wet_Paths_RDD = sc.parallelize(Wet_Paths)
Wet_RDD = Wet_Paths_RDD.map(open_wet_filelist).flatMap(split_wetfiles)

我准备两个RDD的方式是一行看起来像这样:

(k,(some,other,values))

然后我将元数据RDD与我的内容RDD连接在一起,如下所示:

Wat_Wet_RDD = Wat_RDD.join(Wet_RDD)

然后我解压缩了现在相对复杂的元组,并做了语言检测等事情。我必须执行RDDs的联接,因为到目前为止,我的所有字符串都表示为无法在数据帧中表示的byte strings

Wat_Wet_RDD = Wat_Wet_RDD.map(unpack_wat_wet_tuple_and_decoding_and_langdetect)

然后,我将连接的RDD转移到Dataframe

wat_wet_schema = StructType([
StructField("URI", StringType(), True),
StructField("Links", StringType(), True),
StructField("N_Links", IntegerType(), True),
StructField("Content_type", StringType(), True),
StructField("Original_Encoding", StringType(), True),
StructField("Content", StringType(), True),
StructField("Language", StringType(), True),
StructField("Language_confidence", IntegerType(), True),
])
WatWet_DF = sqlContext.createDataFrame(Wat_Wet_RDD, schema=wat_wet_schema)

并用以下内容查看它:

print(WatWet_DF.show(20))

到目前为止,一切都需要 24 分钟,但下一步:

print(WatWet_DF.groupBy(WatWet_DF.Language).count().orderBy(desc('count')).show(100))

我在 24 小时后中止,在这个阶段没有解决任何一项任务。

目前,我正在单个测试 linux VM 上运行群集。VM 有 4 个核心,并同时运行主服务器和辅助服务器。工人有 4 个刽子手,每个刽子手都有 3.5G 的内存。数据帧应包含大约 100 万行。Apache Spark版本是2.1.0,使用python 3.5。虚拟机运行在具有 24G RAM 的过时至强 W3680 6(v12)内核之上。

好的,所以我发现了为什么.count().groupBy()在这个数据集上花费的时间比.show()长得多。原因是,为了使.count().groupBy()提供所有结果,Wat_Wet_RDD.map(unpack_wat_wet_tuple_and_decoding_and_langdetect)需要将此处映射阶段执行的函数应用于整个数据集。对于提供结果.show(),这些函数只需要应用于整个数据集的子集,从而更快地提供结果。现在映射阶段,Wat_Wet_RDD.map(unpack_wat_wet_tuple_and_decoding_and_langdetect)有一些非常昂贵的功能,导致计算时间非常长,尤其是当.count().groupBy().show()进行比较时。

相关内容

  • 没有找到相关文章

最新更新