我有一个Spark 1.5.0 DataFrame,在同一列中混合了null
和空字符串。我想将所有列中的所有空字符串转换为null
(Python中的None
)。DataFrame可能有数百列,所以我尽量避免对每个列进行硬编码操作。
见下面我的尝试,结果是一个错误。
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
## Create a test DataFrame
testDF = sqlContext.createDataFrame([Row(col1='foo', col2=1), Row(col1='', col2=2), Row(col1=None, col2='')])
testDF.show()
## +----+----+
## |col1|col2|
## +----+----+
## | foo| 1|
## | | 2|
## |null|null|
## +----+----+
## Try to replace an empty string with None/null
testDF.replace('', None).show()
## ValueError: value should be a float, int, long, string, list, or tuple
## A string value of null (obviously) doesn't work...
testDF.replace('', 'null').na.drop(subset='col1').show()
## +----+----+
## |col1|col2|
## +----+----+
## | foo| 1|
## |null| 2|
## +----+----+
就是这么简单:
from pyspark.sql.functions import col, when
def blank_as_null(x):
return when(col(x) != "", col(x)).otherwise(None)
dfWithEmptyReplaced = testDF.withColumn("col1", blank_as_null("col1"))
dfWithEmptyReplaced.show()
## +----+----+
## |col1|col2|
## +----+----+
## | foo| 1|
## |null| 2|
## |null|null|
## +----+----+
dfWithEmptyReplaced.na.drop().show()
## +----+----+
## |col1|col2|
## +----+----+
## | foo| 1|
## +----+----+
如果你想要填充多个列,你可以使用reduce:
to_convert = set([...]) # Some set of columns
reduce(lambda df, x: df.withColumn(x, blank_as_null(x)), to_convert, testDF)
或使用推导式:
exprs = [
blank_as_null(x).alias(x) if x in to_convert else x for x in testDF.columns]
testDF.select(*exprs)
如果您想具体操作字符串字段,请检查robin-loxley的答案
udf的效率并不高。使用内置方法完成此操作的正确方法是:
df = df.withColumn('myCol', when(col('myCol') == '', None).otherwise(col('myCol')))
只需在zero323和soulmachine的答案上加上。为所有StringType字段进行转换。
from pyspark.sql.types import StringType
string_fields = []
for i, f in enumerate(test_df.schema.fields):
if isinstance(f.dataType, StringType):
string_fields.append(f.name)
我的解决方案比我迄今为止看到的所有解决方案都要好得多,它可以处理尽可能多的字段,参见下面的小函数:
// Replace empty Strings with null values
private def setEmptyToNull(df: DataFrame): DataFrame = {
val exprs = df.schema.map { f =>
f.dataType match {
case StringType => when(length(col(f.name)) === 0, lit(null: String).cast(StringType)).otherwise(col(f.name)).as(f.name)
case _ => col(f.name)
}
}
df.select(exprs: _*)
}
你可以很容易地用Python重写上面的函数。
我是从@liancheng那里学来的
如果您正在使用python,您可以检查以下内容。
+----+-----+----+
| id| name| age|
+----+-----+----+
|null|name1| 50|
| 2| | |
| |name3|null|
+----+-----+----+
def convertToNull(dfa):
for i in dfa.columns:
dfa = dfa.withColumn(i , when(col(i) == '', None ).otherwise(col(i)))
return dfa
convertToNull(dfa).show()
+----+-----+----+
| id| name| age|
+----+-----+----+
|null|name1| 50|
| 2| null|null|
|null|name3|null|
+----+-----+----+
我会将trim
添加到@zero323的解决方案中,以处理多个空白的情况:
def blank_as_null(x):
return when(trim(col(x)) != "", col(x))
感谢@zero323, @Tomerikoo和@Robin Loxley
准备使用函数:
def convert_blank_to_null(df, cols=None):
from pyspark.sql.functions import col, when, trim
from pyspark.sql.types import StringType
def blank_as_null(x):
return when(trim(col(x)) == "", None).otherwise(col(x))
# Don't know how to parallel
for f in (df.select(cols) if cols else df).schema.fields:
if isinstance(f.dataType, StringType):
df = df.withColumn(f.name, blank_as_null(f.name))
return df
这有助于我对值进行净化。
所有列:
address_sanitize_df = address_df.select([when(col(c) == "", None).otherwise(col(c)).alias(c) for c in address_df.columns]).distinct()
address_sanitize_df.show()
对于特定列:
sanitize_cols=["address_line2","zip4"]
address_sanitize_df = address_df.select([when(col(c) == "", None).otherwise(col(c)).alias(c) for c in sanitize_cols])
address_sanitize_df.show()
这是一个不同版本的soulmachine的解决方案,但我不认为你可以很容易地将其转换为Python:
def emptyStringsToNone(df: DataFrame): DataFrame = {
df.schema.foldLeft(df)(
(current, field) =>
field.dataType match {
case DataTypes.StringType =>
current.withColumn(
field.name,
when(length(col(field.name)) === 0, lit(null: String)).otherwise(col(field.name))
)
case _ => current
}
)
}