我是事物的亲戚初学者。我有一个宽的数据框(1000列),我想根据是否缺少值
将列添加到列中。so
---- |A | ---- |1 | ---- | null | ---- |3 | ----
变成
----- ------- |A |a_mis | ----- ------- |1 |0 | ----- ------- | null |1 | ----- ------- |3 |1 | ----- -------
这是自定义ML变压器的一部分,但算法应清楚。
override def transform(dataset: org.apache.spark.sql.Dataset[_]): org.apache.spark.sql.DataFrame = {
var ds = dataset
dataset.columns.foreach(c => {
if (dataset.filter(col(c).isNull).count() > 0) {
ds = ds.withColumn(c + "_MIS", when(col(c).isNull, 1).otherwise(0))
}
})
ds.toDF()
}
在列上循环,如果> 0 nulls创建一个新列。
传递的数据集被缓存(使用.cache方法),而相关的配置设置为默认设置。现在,这是在一台笔记本电脑上运行的,即使行量最少,也可以在1000列的40分钟内运行。我认为问题是由于遇到数据库,因此我尝试使用一个镶木quet文件,而不是相同的结果。查看工作UI,似乎正在执行FilesCans以进行计数。
有什么方法可以改进该算法以获得更好的性能,还是以某种方式调整缓存?增加spark.sql.inmemorycolumnarstorage.batchsize只是给我一个oom错误。
删除条件:
if (dataset.filter(col(c).isNull).count() > 0)
仅留下内部表达。由于编写,火花需要#Columns数据扫描。
如果您想要摘除列计算统计信息一次,如使用pyspark的每列列中的非nan条目的数量,并使用单个drop
调用。
这是解决问题的代码。
override def transform(dataset: Dataset[_]): DataFrame = {
var ds = dataset
val rowCount = dataset.count()
val exprs = dataset.columns.map(count(_))
val colCounts = dataset.agg(exprs.head, exprs.tail: _*).toDF(dataset.columns: _*).first()
dataset.columns.foreach(c => {
if (colCounts.getAs[Long](c) > 0 && colCounts.getAs[Long](c) < rowCount ) {
ds = ds.withColumn(c + "_MIS", when(col(c).isNull, 1).otherwise(0))
}
})
ds.toDF()
}