火花柱状性能



我是事物的亲戚初学者。我有一个宽的数据框(1000列),我想根据是否缺少值

将列添加到列中。

so

 ---- |A | ---- |1 | ---- | null | ---- |3 | ---- 

变成

 ----- ------- |A |a_mis | ----- ------- |1 |0 | ----- ------- | null |1 | ----- ------- |3 |1 | ----- ------- 

这是自定义ML变压器的一部分,但算法应清楚。

override def transform(dataset: org.apache.spark.sql.Dataset[_]): org.apache.spark.sql.DataFrame = {
  var ds = dataset
  dataset.columns.foreach(c => {
    if (dataset.filter(col(c).isNull).count() > 0) {
      ds = ds.withColumn(c + "_MIS", when(col(c).isNull, 1).otherwise(0))
    }
  })

  ds.toDF()
}

在列上循环,如果> 0 nulls创建一个新列。

传递的数据集被缓存(使用.cache方法),而相关的配置设置为默认设置。现在,这是在一台笔记本电脑上运行的,即使行量最少,也可以在1000列的40分钟内运行。我认为问题是由于遇到数据库,因此我尝试使用一个镶木quet文件,而不是相同的结果。查看工作UI,似乎正在执行FilesCans以进行计数。

有什么方法可以改进该算法以获得更好的性能,还是以某种方式调整缓存?增加spark.sql.inmemorycolumnarstorage.batchsize只是给我一个oom错误。

删除条件:

if (dataset.filter(col(c).isNull).count() > 0) 

仅留下内部表达。由于编写,火花需要#Columns数据扫描。

如果您想要摘除列计算统计信息一次,如使用pyspark的每列列中的非nan条目的数量,并使用单个drop调用。

这是解决问题的代码。

override def transform(dataset: Dataset[_]): DataFrame = {
  var ds = dataset
  val rowCount = dataset.count()
  val exprs = dataset.columns.map(count(_))
  val colCounts = dataset.agg(exprs.head, exprs.tail: _*).toDF(dataset.columns: _*).first()
  dataset.columns.foreach(c => {
    if (colCounts.getAs[Long](c) > 0 && colCounts.getAs[Long](c) < rowCount   ) {
      ds = ds.withColumn(c + "_MIS", when(col(c).isNull, 1).otherwise(0))
    }
  })
  ds.toDF()
}

相关内容

  • 没有找到相关文章