如何在 Spark 中将 STRUCT 的所有元素与 1000 多个元素连接起来



我有一个火花数据帧,如下所示,其中包含一个struct字段。

val arrayStructData = Seq(
Row("James",Row("Java","XX",120)),
Row("Michael",Row("Java","",200)),
Row("Robert",Row("Java","XZ",null)),
Row("Washington",Row("","XX",120))
)
val arrayStructSchema = new StructType().add("name",StringType).add("my_struct", new StructType().add("name",StringType).add("author",StringType).add("pages",IntegerType))
val df = spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData),arrayStructSchema)

df.printSchema()
root
|-- name: string (nullable = true)
|-- my_struct: struct (nullable = true)
|    |-- name: string (nullable = true)
|    |-- author: string (nullable = true)
|    |-- pages: integer (nullable = true)
df.show(false)
+----------+---------------+
|name      |my_struct      |
+----------+---------------+
|James     |[Java, XX, 120]|
|Michael   |[Java, , 200]  |
|Robert    |[Java, XZ,]    |
|Washington|[, XX, 120]    |
+----------+---------------+

我想构造一个名为final_list的输出列,它显示结构中不存在或存在元素。问题是,在这个例子中,结构元素仅限于 3,但在实际数据中,结构中有 1,000 个元素,每条记录可能包含也可能不包含每个元素中的值。

以下是我想构建专栏的方式 -

val cleaned_df = spark.sql(s"""select name, case when my_struct.name = "" then "" else "name" end as name_present
, case when my_struct.author = "" then "" else "author" end as author_present 
, case when my_struct.pages = "" then "" else "pages" end as pages_present 
from df""")
cleaned_df.createOrReplaceTempView("cleaned_df")
cleaned_df.show(false)
+----------+------------+--------------+-------------+
|name      |name_present|author_present|pages_present|
+----------+------------+--------------+-------------+
|James     |name        |author        |pages        |
|Michael   |name        |              |pages        |
|Robert    |name        |author        |pages        |
|Washington|            |author        |pages        |
+----------+------------+--------------+-------------+

因此,我为每列编写一个case语句,以捕获其存在或不存在。然后我像下面这样进行连接以获得最终输出 -

val final_df = spark.sql(s"""
select name, concat_ws("," , name_present, author_present, pages_present) as final_list
from cleaned_df
""")
final_df.show(false)
+----------+-----------------+
|name      |final_list       |
+----------+-----------------+
|James     |name,author,pages|
|Michael   |name,,pages      |
|Robert    |name,author,pages|
|Washington|,author,pages    |
+----------+-----------------+

我无法编写一个巨大的 case 语句来捕获 1,000 个元素结构的这个。有没有更聪明的方法可以做到这一点?也许是UDF ?

我正在使用Spark 2.4.3。我不知道是否有任何高阶函数支持这一点。但是我的真实数据帧的架构如下所示 -

|-- name: string (nullable = true)
|-- my_struct: struct (nullable = true)
|    |-- name: string (nullable = true)
|    |-- author: string (nullable = true)
|    |-- element3: integer (nullable = true)
|    |-- element4: string (nullable = true)
|    |-- element5: double (nullable = true)
.....
.....
|    |-- element1000: string (nullable = true)

你已经提到了一个UDF。使用 UDF,您可以遍历所有my_struct字段并收集标志:

def availableFields = (in:Row) => {
val ret = scala.collection.mutable.ListBuffer.empty[String]
for( i <- Range(0, in.size)) {
if( !in.isNullAt(i) && in.get(i) != "") {
ret += in.schema.fields(i).name
}
}
ret.mkString(",")
}
val availableFieldsUdf = udf(availableFields)
df.withColumn("final_list", availableFieldsUdf(col("my_struct")) ).show(false)

指纹

+----------+---------------+-----------------+
|name      |my_struct      |final_list       |
+----------+---------------+-----------------+
|James     |[Java, XX, 120]|name,author,pages|
|Michael   |[Java, , 200]  |name,pages       |
|Robert    |[Java, XZ,]    |name,author      |
|Washington|[, XX, 120]    |author,pages     |
+----------+---------------+-----------------+

不带UDF.

图式

scala> df.printSchema
root
|-- name: string (nullable = true)
|-- my_struct: struct (nullable = true)
|    |-- name: string (nullable = true)
|    |-- author: string (nullable = true)
|    |-- pages: integer (nullable = true)

构造表达式

scala> 
val expr = df
.select("my_struct.*") // Extracting struct columns.
.columns
.map(c => (c, trim(col(s"my_struct.${c}")))) // Constructing tupes ("name", trim(col("my_struct.name")))
.map(c => when(c._2.isNotNull and c._2 =!= "",lit(c._1))) // Checking Not Null & Empty values.
expr: Array[org.apache.spark.sql.Column] = Array(CASE WHEN ((trim(my_struct.name) IS NOT NULL) AND (NOT (trim(my_struct.name) = ))) THEN name END, CASE WHEN ((trim(my_struct.author) IS NOT NULL) AND (NOT (trim(my_struct.author) = ))) THEN author END, CASE WHEN ((trim(my_struct.pages) IS NOT NULL) AND (NOT (trim(my_struct.pages) = ))) THEN pages END)

将表达式应用于数据帧

scala> df.withColumn("final_list",concat_ws(",",expr:_*)).show
+----------+---------------+-----------------+
|      name|      my_struct|       final_list|
+----------+---------------+-----------------+
|     James|[Java, XX, 120]|name,author,pages|
|   Michael|  [Java, , 200]|       name,pages|
|    Robert|    [Java, XZ,]|      name,author|
|Washington|    [, XX, 120]|     author,pages|
+----------+---------------+-----------------+

相关内容

最新更新