我想对我的数据进行一些预处理,我想删除稀疏的行(对于某些阈值)。
例如,我有一个包含 10 个特征的数据帧表,并且我有一行具有 8 个 null 值,然后我想删除它。
我找到了一些相关主题,但我找不到任何有用的信息。
stackoverflow.com/questions/3473778/count-number-of-nulls-in-a-row
上面链接中的示例对我不起作用,因为我想自动进行此预处理。我无法编写列名并相应地执行某些操作。
那么,是否可以在不将 Apache Spark 中的列名与 scala 一起使用的情况下执行此删除操作?
惊讶没有答案指出Spark SQL带有很少满足要求的标准函数:
例如,我有一个包含 10 个特征的数据帧表,并且我有一行具有 8 个 null 值,然后我想删除它。
您可以使用DataFrameNaFunctions.drop方法的变体之一,并适当设置minNonNulls
例如2。
drop(minNonNulls: Int, cols: Seq[String]): DataFrame 返回一个新的数据帧,该数据帧删除指定列中包含小于 minNonNulls 的非 null 和非 NaN 值的行。
并且为了满足列名的可变性,如要求:
我无法编写列名并相应地执行某些操作。
您可以简单地使用 Dataset.columns:
列:数组[字符串] 以数组形式返回所有列名。
假设您有以下数据集,其中包含 5 个特征(列)和几乎null
几行。
val ns: String = null
val features = Seq(("0","1","2",ns,ns), (ns, ns, ns, ns, ns), (ns, "1", ns, "2", ns)).toDF
scala> features.show
+----+----+----+----+----+
| _1| _2| _3| _4| _5|
+----+----+----+----+----+
| 0| 1| 2|null|null|
|null|null|null|null|null|
|null| 1|null| 2|null|
+----+----+----+----+----+
// drop rows with more than (5 columns - 2) = 3 nulls
scala> features.na.drop(2, features.columns).show
+----+---+----+----+----+
| _1| _2| _3| _4| _5|
+----+---+----+----+----+
| 0| 1| 2|null|null|
|null| 1|null| 2|null|
+----+---+----+----+----+
考试日期:
case class Document( a: String, b: String, c: String)
val df = sc.parallelize(Seq(new Document(null, null, null), new Document("a", null, null), new Document("a", "b", null), new Document("a", "b", "c"), new Document(null, null, "c"))).df
使用UDF
重新混合大卫的答案和我在下面的RDD版本,你可以使用一个行的UDF来完成:
def nullFilter = udf((x:Row) => {Range(0, x.length).count(x.isNullAt(_)) < 2})
df.filter(nullFilter(struct(df.columns.map(df(_)) : _*))).show
使用RDD
您可以将其转换为 rdd,即行中列的循环,并计算有多少为空。
sqlContext.createDataFrame(df.rdd.filter( x=> Range(0, x.length).count(x.isNullAt(_)) < 2 ), df.schema).show
使用 UDF 更干净:
import org.apache.spark.sql.functions.udf
def countNulls = udf((v: Any) => if (v == null) 1; else 0;))
df.registerTempTable("foo")
sqlContext.sql(
"select " + df.columns.mkString(", ") + ", " + df.columns.map(c => {
"countNulls(" + c + ")"
}).mkString(" + ") + "as nullCount from foo"
).filter($"nullCount" > 8).show
如果制作查询字符串让您感到紧张,那么您可以尝试以下操作:
var countCol: org.apache.spark.sql.Column = null
df.columns.foreach(c => {
if (countCol == null) countCol = countNulls(col(c))
else countCol = countCol + countNulls(col(c))
});
df.select(Seq(countCol as "nullCount") ++ df.columns.map(c => col(c)):_*)
.filter($"nullCount" > 8)
以下是Spark 2.0中的替代方案:
val df = Seq((null,"A"),(null,"B"),("1","C"))
.toDF("foo","bar")
.withColumn("foo", 'foo.cast("Int"))
df.show()
+----+---+
| foo|bar|
+----+---+
|null| A|
|null| B|
| 1| C|
+----+---+
df.where('foo.isNull).groupBy('foo).count().show()
+----+-----+
| foo|count|
+----+-----+
|null| 2|
+----+-----+