我有一个模式为
的DataFrame
root
|-- label: string (nullable = true)
|-- features: struct (nullable = true)
| |-- feat1: string (nullable = true)
| |-- feat2: string (nullable = true)
| |-- feat3: string (nullable = true)
同时,我可以使用
过滤数据帧 val data = rawData
.filter( !(rawData("features.feat1") <=> "100") )
无法使用
删除列 val data = rawData
.drop("features.feat1")
是我在这里做错了什么吗?我也尝试过(不成功)做drop(rawData("features.feat1"))
,尽管这样做没有多大意义。
提前感谢,
Nikhil
这只是一个编程练习,但你可以尝试这样做:
import org.apache.spark.sql.{DataFrame, Column}
import org.apache.spark.sql.types.{StructType, StructField}
import org.apache.spark.sql.{functions => f}
import scala.util.Try
case class DFWithDropFrom(df: DataFrame) {
def getSourceField(source: String): Try[StructField] = {
Try(df.schema.fields.filter(_.name == source).head)
}
def getType(sourceField: StructField): Try[StructType] = {
Try(sourceField.dataType.asInstanceOf[StructType])
}
def genOutputCol(names: Array[String], source: String): Column = {
f.struct(names.map(x => f.col(source).getItem(x).alias(x)): _*)
}
def dropFrom(source: String, toDrop: Array[String]): DataFrame = {
getSourceField(source)
.flatMap(getType)
.map(_.fieldNames.diff(toDrop))
.map(genOutputCol(_, source))
.map(df.withColumn(source, _))
.getOrElse(df)
}
}
使用例子:
scala> case class features(feat1: String, feat2: String, feat3: String)
defined class features
scala> case class record(label: String, features: features)
defined class record
scala> val df = sc.parallelize(Seq(record("a_label", features("f1", "f2", "f3")))).toDF
df: org.apache.spark.sql.DataFrame = [label: string, features: struct<feat1:string,feat2:string,feat3:string>]
scala> DFWithDropFrom(df).dropFrom("features", Array("feat1")).show
+-------+--------+
| label|features|
+-------+--------+
|a_label| [f2,f3]|
+-------+--------+
scala> DFWithDropFrom(df).dropFrom("foobar", Array("feat1")).show
+-------+----------+
| label| features|
+-------+----------+
|a_label|[f1,f2,f3]|
+-------+----------+
scala> DFWithDropFrom(df).dropFrom("features", Array("foobar")).show
+-------+----------+
| label| features|
+-------+----------+
|a_label|[f1,f2,f3]|
+-------+----------+
此版本允许您删除任何级别的嵌套列:
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructType, DataType}
/**
* Various Spark utilities and extensions of DataFrame
*/
object DataFrameUtils {
private def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = {
if (fullColName.equals(dropColName)) {
None
} else {
colType match {
case colType: StructType =>
if (dropColName.startsWith(s"${fullColName}.")) {
Some(struct(
colType.fields
.flatMap(f =>
dropSubColumn(col.getField(f.name), f.dataType, s"${fullColName}.${f.name}", dropColName) match {
case Some(x) => Some(x.alias(f.name))
case None => None
})
: _*))
} else {
Some(col)
}
case other => Some(col)
}
}
}
protected def dropColumn(df: DataFrame, colName: String): DataFrame = {
df.schema.fields
.flatMap(f => {
if (colName.startsWith(s"${f.name}.")) {
dropSubColumn(col(f.name), f.dataType, f.name, colName) match {
case Some(x) => Some((f.name, x))
case None => None
}
} else {
None
}
})
.foldLeft(df.drop(colName)) {
case (df, (colName, column)) => df.withColumn(colName, column)
}
}
/**
* Extended version of DataFrame that allows to operate on nested fields
*/
implicit class ExtendedDataFrame(df: DataFrame) extends Serializable {
/**
* Drops nested field from DataFrame
*
* @param colName Dot-separated nested field name
*/
def dropNestedColumn(colName: String): DataFrame = {
DataFrameUtils.dropColumn(df, colName)
}
}
}
用法:
import DataFrameUtils._
df.dropNestedColumn("a.b.c.d")
对于Spark 3.1+,您可以在struct类型列上使用dropFields
方法:
按名称删除StructType中的字段的表达式。这是不可能的如果模式不包含字段名
val df = sql("SELECT named_struct('feat1', 1, 'feat2', 2, 'feat3', 3) features")
val df1 = df.withColumn("features", $"features".dropFields("feat1"))
展开频谱回答。支持数组类型:
object DataFrameUtils {
private def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = {
if (fullColName.equals(dropColName)) {
None
} else if (dropColName.startsWith(s"$fullColName.")) {
colType match {
case colType: StructType =>
Some(struct(
colType.fields
.flatMap(f =>
dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match {
case Some(x) => Some(x.alias(f.name))
case None => None
})
: _*))
case colType: ArrayType =>
colType.elementType match {
case innerType: StructType =>
Some(struct(innerType.fields
.flatMap(f =>
dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match {
case Some(x) => Some(x.alias(f.name))
case None => None
})
: _*))
}
case other => Some(col)
}
} else {
Some(col)
}
}
protected def dropColumn(df: DataFrame, colName: String): DataFrame = {
df.schema.fields
.flatMap(f => {
if (colName.startsWith(s"${f.name}.")) {
dropSubColumn(col(f.name), f.dataType, f.name, colName) match {
case Some(x) => Some((f.name, x))
case None => None
}
} else {
None
}
})
.foldLeft(df.drop(colName)) {
case (df, (colName, column)) => df.withColumn(colName, column)
}
}
/**
* Extended version of DataFrame that allows to operate on nested fields
*/
implicit class ExtendedDataFrame(df: DataFrame) extends Serializable {
/**
* Drops nested field from DataFrame
*
* @param colName Dot-separated nested field name
*/
def dropNestedColumn(colName: String): DataFrame = {
DataFrameUtils.dropColumn(df, colName)
}
}
}
我将详细介绍mmendez。这里是语义上的答案,并且考虑了子线程中描述的问题。
def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = {
if (fullColName.equals(dropColName)) {
None
} else if (dropColName.startsWith(s"$fullColName.")) {
colType match {
case colType: StructType =>
Some(struct(
colType.fields
.flatMap(f =>
dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match {
case Some(x) => Some(x.alias(f.name))
case None => None
})
: _*))
case colType: ArrayType =>
colType.elementType match {
case innerType: StructType =>
// we are potentially dropping a column from within a struct, that is itself inside an array
// Spark has some very strange behavior in this case, which they insist is not a bug
// see https://issues.apache.org/jira/browse/SPARK-31779 and associated comments
// and also the thread here: https://stackoverflow.com/a/39943812/375670
// this is a workaround for that behavior
// first, get all struct fields
val innerFields = innerType.fields
// next, create a new type for all the struct fields EXCEPT the column that is to be dropped
// we will need this later
val preserveNamesStruct = ArrayType(StructType(
innerFields.filterNot(f => s"$fullColName.${f.name}".equals(dropColName))
))
// next, apply dropSubColumn recursively to build up the new values after dropping the column
val filteredInnerFields = innerFields.flatMap(f =>
dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match {
case Some(x) => Some(x.alias(f.name))
case None => None
}
)
// finally, use arrays_zip to unwrap the arrays that were introduced by building up the new. filtered
// struct in this way (see comments in SPARK-31779), and then cast to the StructType we created earlier
// to get the original names back
Some(arrays_zip(filteredInnerFields:_*).cast(preserveNamesStruct))
}
case _ => Some(col)
}
} else {
Some(col)
}
}
def dropColumn(df: DataFrame, colName: String): DataFrame = {
df.schema.fields.flatMap(f => {
if (colName.startsWith(s"${f.name}.")) {
dropSubColumn(col(f.name), f.dataType, f.name, colName) match {
case Some(x) => Some((f.name, x))
case None => None
}
} else {
None
}
}).foldLeft(df.drop(colName)) {
case (df, (colName, column)) => df.withColumn(colName, column)
}
}
在spark-shell
中的用法:
// if defining the functions above in your spark-shell session, you first need imports
import org.apache.spark.sql._
import org.apache.spark.sql.types._
// now you can paste the function definitions
// create a deeply nested and complex JSON structure
val jsonData = """{
"foo": "bar",
"top": {
"child1": 5,
"child2": [
{
"child2First": "one",
"child2Second": 2,
"child2Third": -19.51
}
],
"child3": ["foo", "bar", "baz"],
"child4": [
{
"child2First": "two",
"child2Second": 3,
"child2Third": 16.78
}
]
}
}"""
// read it into a DataFrame
val df = spark.read.option("multiline", "true").json(Seq(jsonData).toDS())
// remove a sub-column
val modifiedDf = dropColumn(df, "top.child2.child2First")
modifiedDf.printSchema
root
|-- foo: string (nullable = true)
|-- top: struct (nullable = false)
| |-- child1: long (nullable = true)
| |-- child2: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- child2Second: long (nullable = true)
| | | |-- child2Third: double (nullable = true)
| |-- child3: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- child4: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- child2First: string (nullable = true)
| | | |-- child2Second: long (nullable = true)
| | | |-- child2Third: double (nullable = true)
modifiedDf.show(truncate=false)
+---+------------------------------------------------------+
|foo|top |
+---+------------------------------------------------------+
|bar|[5, [[2, -19.51]], [foo, bar, baz], [[two, 3, 16.78]]]|
+---+------------------------------------------------------+
另一种(PySpark)方法是通过再次创建features
来删除features.feat1
列:
from pyspark.sql.functions import col, arrays_zip
display(df
.withColumn("features", arrays_zip("features.feat2", "features.feat3"))
.withColumn("features", col("features").cast(schema))
)
其中schema
为新模式(不包括features.feat1
)。
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType(
[
StructField('feat2', StringType(), True),
StructField('feat3', StringType(), True),
]
)
在spektom的scala代码片段之后,我用Java创建了类似的代码。由于java 8没有foldLeft,我使用forEachOrdered。此代码适用于spark 2。x(我用的是2.1)此外,我注意到,删除一列,并使用相同名称的withColumn添加它不工作,所以我只是替换列,它似乎工作。
代码没有完全测试,希望它能工作:-)
public class DataFrameUtils {
public static Dataset<Row> dropNestedColumn(Dataset<Row> dataFrame, String columnName) {
final DataFrameFolder dataFrameFolder = new DataFrameFolder(dataFrame);
Arrays.stream(dataFrame.schema().fields())
.flatMap( f -> {
if (columnName.startsWith(f.name() + ".")) {
final Optional<Column> column = dropSubColumn(col(f.name()), f.dataType(), f.name(), columnName);
if (column.isPresent()) {
return Stream.of(new Tuple2<>(f.name(), column));
} else {
return Stream.empty();
}
} else {
return Stream.empty();
}
}).forEachOrdered(colTuple -> dataFrameFolder.accept(colTuple));
return dataFrameFolder.getDF();
}
private static Optional<Column> dropSubColumn(Column col, DataType colType, String fullColumnName, String dropColumnName) {
Optional<Column> column = Optional.empty();
if (!fullColumnName.equals(dropColumnName)) {
if (colType instanceof StructType) {
if (dropColumnName.startsWith(fullColumnName + ".")) {
column = Optional.of(struct(getColumns(col, (StructType)colType, fullColumnName, dropColumnName)));
}
} else {
column = Optional.of(col);
}
}
return column;
}
private static Column[] getColumns(Column col, StructType colType, String fullColumnName, String dropColumnName) {
return Arrays.stream(colType.fields())
.flatMap(f -> {
final Optional<Column> column = dropSubColumn(col.getField(f.name()), f.dataType(),
fullColumnName + "." + f.name(), dropColumnName);
if (column.isPresent()) {
return Stream.of(column.get().alias(f.name()));
} else {
return Stream.empty();
}
}
).toArray(Column[]::new);
}
private static class DataFrameFolder implements Consumer<Tuple2<String, Optional<Column>>> {
private Dataset<Row> df;
public DataFrameFolder(Dataset<Row> df) {
this.df = df;
}
public Dataset<Row> getDF() {
return df;
}
@Override
public void accept(Tuple2<String, Optional<Column>> colTuple) {
if (!colTuple._2().isPresent()) {
df = df.drop(colTuple._1());
} else {
df = df.withColumn(colTuple._1(), colTuple._2().get());
}
}
}
使用例子:
private class Pojo {
private String str;
private Integer number;
private List<String> strList;
private Pojo2 pojo2;
public String getStr() {
return str;
}
public Integer getNumber() {
return number;
}
public List<String> getStrList() {
return strList;
}
public Pojo2 getPojo2() {
return pojo2;
}
}
private class Pojo2 {
private String str;
private Integer number;
private List<String> strList;
public String getStr() {
return str;
}
public Integer getNumber() {
return number;
}
public List<String> getStrList() {
return strList;
}
}
SQLContext context = new SQLContext(new SparkContext("local[1]", "test"));
Dataset<Row> df = context.createDataFrame(Collections.emptyList(), Pojo.class);
Dataset<Row> dfRes = DataFrameUtils.dropNestedColumn(df, "pojo2.str");
原始结构:
root
|-- number: integer (nullable = true)
|-- pojo2: struct (nullable = true)
| |-- number: integer (nullable = true)
| |-- str: string (nullable = true)
| |-- strList: array (nullable = true)
| | |-- element: string (containsNull = true)
|-- str: string (nullable = true)
|-- strList: array (nullable = true)
| |-- element: string (containsNull = true)
下降后:
root
|-- number: integer (nullable = true)
|-- pojo2: struct (nullable = false)
| |-- number: integer (nullable = true)
| |-- strList: array (nullable = true)
| | |-- element: string (containsNull = true)
|-- str: string (nullable = true)
|-- strList: array (nullable = true)
| |-- element: string (containsNull = true)
PySpark实现
import pyspark.sql.functions as sf
def _drop_nested_field(
schema: StructType,
field_to_drop: str,
parents: List[str] = None,
) -> Column:
parents = list() if parents is None else parents
src_col = lambda field_names: sf.col('.'.join(f'`{c}`' for c in field_names))
if '.' in field_to_drop:
root, subfield = field_to_drop.split('.', maxsplit=1)
field_to_drop_from = next(f for f in schema.fields if f.name == root)
return sf.struct(
*[src_col(parents + [f.name]) for f in schema.fields if f.name != root],
_drop_nested_field(
schema=field_to_drop_from.dataType,
field_to_drop=subfield,
parents=parents + [root]
).alias(root)
)
else:
# select all columns except the one to drop
return sf.struct(
*[src_col(parents + [f.name])for f in schema.fields if f.name != field_to_drop],
)
def drop_nested_field(
df: DataFrame,
field_to_drop: str,
) -> DataFrame:
if '.' in field_to_drop:
root, subfield = field_to_drop.split('.', maxsplit=1)
field_to_drop_from = next(f for f in df.schema.fields if f.name == root)
return df.withColumn(root, _drop_nested_field(
schema=field_to_drop_from.dataType,
field_to_drop=subfield,
parents=[root]
))
else:
return df.drop(field_to_drop)
df = drop_nested_field(df, 'a.b.c.d')
添加java版本解决方案。
实用程序类(传递你的数据集和嵌套列,必须被dropNestedColumn函数)。
(Lior Chaga的答案中有一些错误,我在尝试使用他的答案时已经纠正了它们)
public class NestedColumnActions {
/*
dataset : dataset in which we want to drop columns
columnName : nested column that needs to be deleted
*/
public static Dataset<?> dropNestedColumn(Dataset<?> dataset, String columnName) {
//Special case of top level column deletion
if(!columnName.contains("."))
return dataset.drop(columnName);
final DataSetModifier dataFrameFolder = new DataSetModifier(dataset);
Arrays.stream(dataset.schema().fields())
.flatMap(f -> {
//If the column name to be deleted starts with current top level column
if (columnName.startsWith(f.name() + DOT)) {
//Get new column structure under f , expected after deleting the required column
final Optional<Column> column = dropSubColumn(functions.col(f.name()), f.dataType(), f.name(), columnName);
if (column.isPresent()) {
return Stream.of(new Tuple2<>(f.name(), column));
} else {
return Stream.empty();
}
} else {
return Stream.empty();
}
})
//Call accept function with Tuples of (top level column name, new column structure under it)
.forEach(colTuple -> dataFrameFolder.accept(colTuple));
return dataFrameFolder.getDataset();
}
private static Optional<Column> dropSubColumn(Column col, DataType colType, String fullColumnName, String dropColumnName) {
Optional<Column> column = Optional.empty();
if (!fullColumnName.equals(dropColumnName)) {
if (colType instanceof StructType) {
if (dropColumnName.startsWith(fullColumnName + DOT)) {
column = Optional.of(functions.struct(getColumns(col, (StructType) colType, fullColumnName, dropColumnName)));
}
else {
column = Optional.of(col);
}
} else {
column = Optional.of(col);
}
}
return column;
}
private static Column[] getColumns(Column col, StructType colType, String fullColumnName, String dropColumnName) {
return Arrays.stream(colType.fields())
.flatMap(f -> {
final Optional<Column> column = dropSubColumn(col.getField(f.name()), f.dataType(),
fullColumnName + "." + f.name(), dropColumnName);
if (column.isPresent()) {
return Stream.of(column.get().alias(f.name()));
} else {
return Stream.empty();
}
}
).toArray(Column[]::new);
}
private static class DataSetModifier implements Consumer<Tuple2<String, Optional<Column>>> {
private Dataset<?> df;
public DataSetModifier(Dataset<?> df) {
this.df = df;
}
public Dataset<?> getDataset() {
return df;
}
/*
colTuple[0]:top level column name
colTuple[1]:new column structure under it
*/
@Override
public void accept(Tuple2<String, Optional<Column>> colTuple) {
if (!colTuple._2().isPresent()) {
df = df.drop(colTuple._1());
} else {
df = df.withColumn(colTuple._1(), colTuple._2().get());
}
}
}
}
Make Structs Easy*库使得在嵌套数据结构中执行添加、删除和重命名字段等操作变得容易。该库在Scala和Python中都可用。
假设您有以下数据:
import org.apache.spark.sql.functions._
case class Features(feat1: String, feat2: String, feat3: String)
case class Record(features: Features, arrayOfFeatures: Seq[Features])
val df = Seq(
Record(Features("hello", "world", "!"), Seq(Features("red", "orange", "yellow"), Features("green", "blue", "indigo")))
).toDF
df.printSchema
// root
// |-- features: struct (nullable = true)
// | |-- feat1: string (nullable = true)
// | |-- feat2: string (nullable = true)
// | |-- feat3: string (nullable = true)
// |-- arrayOfFeatures: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- feat1: string (nullable = true)
// | | |-- feat2: string (nullable = true)
// | | |-- feat3: string (nullable = true)
df.show(false)
// +-----------------+----------------------------------------------+
// |features |arrayOfFeatures |
// +-----------------+----------------------------------------------+
// |[hello, world, !]|[[red, orange, yellow], [green, blue, indigo]]|
// +-----------------+----------------------------------------------+
那么从features
中删除feat2
就很简单了:
import com.github.fqaiser94.mse.methods._
// drop feat2 from features
df.withColumn("features", $"features".dropFields("feat2")).show(false)
// +----------+----------------------------------------------+
// |features |arrayOfFeatures |
// +----------+----------------------------------------------+
// |[hello, !]|[[red, orange, yellow], [green, blue, indigo]]|
// +----------+----------------------------------------------+
我注意到有很多关于其他解决方案的后续评论,询问是否有一种方法可以删除嵌套在数组内嵌套的结构内的列。这可以通过组合Make Structs Easy库提供的函数和spark-hofs库提供的函数来实现,如下所示:
import za.co.absa.spark.hofs._
// drop feat2 in each element of arrayOfFeatures
df.withColumn("arrayOfFeatures", transform($"arrayOfFeatures", features => features.dropFields("feat2"))).show(false)
// +-----------------+--------------------------------+
// |features |arrayOfFeatures |
// +-----------------+--------------------------------+
// |[hello, world, !]|[[red, yellow], [green, indigo]]|
// +-----------------+--------------------------------+
*充分披露:我是Make Structs Easy库的作者,这个库在这个回答中被引用。
使用Spark 3.1+,简短有效:
object DatasetOps {
implicit class DatasetOps[T](val dataset: Dataset[T]) {
def dropFields(fieldNames: String*): DataFrame =
fieldNames.foldLeft(dataset.toDF()) { (dataset, fieldName) =>
val subFieldRegex = "(\w+)\.(.+)".r
fieldName match {
case subFieldRegex(columnName, subFieldPath) =>
dataset.withColumn(columnName, col(columnName).dropFields(subFieldPath))
case _ => dataset.drop(fieldName)
}
}
}
}
这也保留了模式中需要或不需要的布尔值。
用法:
dataset.dropFields("some_column", "some_struct.some_sub_field.some_field")