将Spark Dataframe Column转换为只有一行的Dataframe(ArrayType)



我有一个数据帧,其中包含一列包含复杂对象:

+--------+
|col1    |
+--------+
|object1 |
|object2 | 
|object3 |    
+--------+

这个对象的模式非常复杂,看起来像:

root:struct
field1:string
field2:decimal(38,18)
object1:struct
field3:string
object2:struct
field4:string
field5:decimal(38,18)

将所有内容分组并转换为数组的最佳方法是什么?

例如:

+-----------------------------+
|col1                         |
+-----------------------------+
| [object1, object2, object3] |    
+-----------------------------+

我试图从一列中生成一个数组,然后从中创建一个数据帧:

final case class A(b: Array[Any])
val c = df.select("col1").collect().map(_(0)).toArray
df.sparkSession.createDataset(Seq(A(b = c)))

然而,Spark不喜欢我的Array[Any]技巧:

java.lang.ClassNotFoundException: scala.Any

有什么想法吗?

对所有内容进行分组并将其转换为数组的最佳方法是什么?

甚至没有一个好的方法来做到这一点。请记住,Spark不能分发单独的行。结果将是:

  • 按顺序处理
  • 可能太大而无法存储在内存中

除上述之外,您可以只使用collect_list:

import org.apache.spark.sql.functions.{col, collect_list}
df.select(collect_list(col("col1"))

Spark对数据类型使用编码器,这就是Any不起作用的原因。

如果复杂对象的模式是固定的,则可以使用该模式定义case class,并执行以下操作:

case class C(... object1: A, object2: B ...)
val df = ???
val mappedDF = df.as[C] // this will map each complex object to case class

接下来,可以使用UDF在行级别上将每个C对象更改为Seq(...)。看起来像

import org.apache.spark.sql.expressions.{UserDefinedFunction => UDF}
import org.apache.spark.sql.functions.col
def convert: UDF =
udf((complexObj: C) => Seq(complexObj.object1,complexObj.object2,complexObj.object3))

要使用此UDF

mappedDF.withColumn("resultColumn", convert(col("col1")))

注意:由于没有提供太多关于模式的信息,我使用了像A和B这样的泛型。您必须定义所有这些。

最新更新