Spark UDAF 动态输入架构处理



我知道如何将带有内部结构的结构从这里传递给 UDAF -在火花中将结构传递给 UDAF

但是,我如何处理内部结构架构未知或动态的情况,因为它会根据数据而变化。某些字段可能存在,也可能不存在,因为输入数据不符合特定架构。假设一个数据集有

   root
     |-- id:string (nullable = false)
     |-- age: long (nullable = true)
     |-- cars: struct (nullable = true)
     |    |-- car1: string (nullable = true)
     |    |-- car2: string (nullable = true)
     |    |-- car3: string (nullable = true)
     |-- name: string (nullable = true)

而另一个数据集没有car3

root
 |-- id:string (nullable = false)
 |-- age: long (nullable = true)
 |-- cars: struct (nullable = true)
 |    |-- car1: string (nullable = true)
 |    |-- car2: string (nullable = true)
 |-- name: string (nullable = true)

如何编写接受根据输入数据更改的架构的 UDAF。

架构可以在初始化 Udaf 类时动态传递 -

    val yetAnotherUdaf = new YetAnotherUdaf(schema)
    case class YetAnotherUdaf(schema:StructType) extends UserDefinedAggregateFunction {
      override def deterministic:Boolean=true
      override def dataType:DataType=schema
      override def inputSchema:StructType=schema
      override def bufferSchema:StructType=schema
      override def initialize(buffer:MutableAggregationBuffer):Unit={ ??? }
      override def update(buffer:MutableAggregationBuffer, input:Row):Unit={ ??? }
      override def merge(buffer1:MutableAggregationBuffer, buffer2:Row):Unit={???}
      override def evaluate(buffer:Row):StructType={ ??? }
   }

相关内容

  • 没有找到相关文章

最新更新