我知道如何将带有内部结构的结构从这里传递给 UDAF -在火花中将结构传递给 UDAF
但是,我如何处理内部结构架构未知或动态的情况,因为它会根据数据而变化。某些字段可能存在,也可能不存在,因为输入数据不符合特定架构。假设一个数据集有
root
|-- id:string (nullable = false)
|-- age: long (nullable = true)
|-- cars: struct (nullable = true)
| |-- car1: string (nullable = true)
| |-- car2: string (nullable = true)
| |-- car3: string (nullable = true)
|-- name: string (nullable = true)
而另一个数据集没有car3
root
|-- id:string (nullable = false)
|-- age: long (nullable = true)
|-- cars: struct (nullable = true)
| |-- car1: string (nullable = true)
| |-- car2: string (nullable = true)
|-- name: string (nullable = true)
如何编写接受根据输入数据更改的架构的 UDAF。
架构可以在初始化 Udaf 类时动态传递 -
val yetAnotherUdaf = new YetAnotherUdaf(schema)
case class YetAnotherUdaf(schema:StructType) extends UserDefinedAggregateFunction {
override def deterministic:Boolean=true
override def dataType:DataType=schema
override def inputSchema:StructType=schema
override def bufferSchema:StructType=schema
override def initialize(buffer:MutableAggregationBuffer):Unit={ ??? }
override def update(buffer:MutableAggregationBuffer, input:Row):Unit={ ??? }
override def merge(buffer1:MutableAggregationBuffer, buffer2:Row):Unit={???}
override def evaluate(buffer:Row):StructType={ ??? }
}