我正在尝试测试Pyspark在一些非常大的(10s GB至1s tbs)数据上迭代的能力。对于大多数脚本,我发现Pyspark具有与Scala代码相同的效率。在其他情况下(如下面的代码),我会遇到严重的速度问题,范围从10到12倍。
path = "path/to/file"
spark = SparkSession.builder.appName("siteLinkStructureByDate").getOrCreate()
sc = spark.sparkContext
df = RecordLoader.loadSomethingAsDF(path, sc, spark)
fdf = df.select(df['aDate'], df['aSourceUrl'], df['contentTextWithUrls'])
rdd = fdf.rdd
rddx = rdd.map (lambda r: (r.aDate, CreateAVertexFromSourceUrlAndContent(r.aSourceUrl, r.contentTextWithUrls)))
.flatMap(lambda r: map(lambda f: (r[0], ExtractDomain(f[0]), ExtractDomain(f[1])), r[1]))
.filter(lambda r: r[-1] != None)
.countByValue()
print([((x[0], x[1], x[2]), y) for x, y in rddx.items()])
我们认为我们已经将问题隔离到.countbyvalue()(返回默认设备),但是应用CountItems()或rediceByKey()产生了几乎相同的结果。我们还确保问题不在99%,而不是提取域或createAvertexfromsourceurlandContent(不是函数的真实姓名,只是伪码以使其可以理解)。
所以我的问题是
- 此代码中有什么可以减少时间的方法?
- 从根本上是pyspark 慢得多对准?
- 是否有一种复制flatmap的方法改用Pyspark DataFrames(了解数据框是通常比Pyspark中的RDD快)?
这里最大的问题可能是通信 - spark sql(columnar format) ->普通的scala对象 ->泡菜(pyrolite) -> socket-> socket-> unckle-> unckle-> plain python对象。这是很多复制,转换和移动的事物。
有一种方法可以使用Pyspark DataFrames复制flatmap
是。它称为 explode
-但公平地说,也很慢。
了解数据框通常比PySpark中的RDD快
这通常是正确的(Scala和Python俩),但是您可能需要udf
实现ExtractDomain
或CreateAVertexFromSourceUrlAndContent
-这是另一回事。仅从您可以使用parse_url_tuple
的名称中。
Pyspark从根本上比Scala对应物慢得多?
这较慢。通常在调谐的代码上速度较慢。但是实现细节是不同的 - 在Scala和Python中,同一一组操作都可以以不同的方式实现。
该代码中有什么可以减少时间的方法?
我建议先分析。确定哪个部分负责(转换,合并)后,您可以尝试定位它。