有什么方法可以提高Pyspark输出的效率



我正在尝试测试Pyspark在一些非常大的(10s GB至1s tbs)数据上迭代的能力。对于大多数脚本,我发现Pyspark具有与Scala代码相同的效率。在其他情况下(如下面的代码),我会遇到严重的速度问题,范围从10到12倍。

path = "path/to/file"
spark = SparkSession.builder.appName("siteLinkStructureByDate").getOrCreate()
sc = spark.sparkContext   
df = RecordLoader.loadSomethingAsDF(path, sc, spark)
fdf = df.select(df['aDate'], df['aSourceUrl'], df['contentTextWithUrls'])
rdd = fdf.rdd
rddx = rdd.map (lambda r: (r.aDate, CreateAVertexFromSourceUrlAndContent(r.aSourceUrl, r.contentTextWithUrls)))
 .flatMap(lambda r: map(lambda f: (r[0], ExtractDomain(f[0]), ExtractDomain(f[1])), r[1]))
 .filter(lambda r: r[-1] != None)
 .countByValue()
print([((x[0], x[1], x[2]), y) for x, y in rddx.items()]) 

我们认为我们已经将问题隔离到.countbyvalue()(返回默认设备),但是应用CountItems()或rediceByKey()产生了几乎相同的结果。我们还确保问题不在99%,而不是提取域或createAvertexfromsourceurlandContent(不是函数的真实姓名,只是伪码以使其可以理解)。

所以我的问题是

  1. 此代码中有什么可以减少时间的方法?
  2. 从根本上是pyspark 慢得多对准?
  3. 是否有一种复制flatmap的方法改用Pyspark DataFrames(了解数据框是通常比Pyspark中的RDD快)?

这里最大的问题可能是通信 - spark sql(columnar format) ->普通的scala对象 ->泡菜(pyrolite) -> socket-> socket-> unckle-> unckle-> plain python对象。这是很多复制,转换和移动的事物。

有一种方法可以使用Pyspark DataFrames复制flatmap

是。它称为 explode-但公平地说,也很慢。

了解数据框通常比PySpark中的RDD快

这通常是正确的(Scala和Python俩),但是您可能需要udf实现ExtractDomainCreateAVertexFromSourceUrlAndContent-这是另一回事。仅从您可以使用parse_url_tuple的名称中。

Pyspark从根本上比Scala对应物慢得多?

这较慢。通常在调谐的代码上速度较慢。但是实现细节是不同的 - 在Scala和Python中,同一一组操作都可以以不同的方式实现。

该代码中有什么可以减少时间的方法?

我建议先分析。确定哪个部分负责(转换,合并)后,您可以尝试定位它。

相关内容

  • 没有找到相关文章

最新更新