将结构传递给火花中的 UDAF



我有以下模式 -

root
 |-- id:string (nullable = false)
 |-- age: long (nullable = true)
 |-- cars: struct (nullable = true)
 |    |-- car1: string (nullable = true)
 |    |-- car2: string (nullable = true)
 |    |-- car3: string (nullable = true)
 |-- name: string (nullable = true)

如何将结构"汽车"传递给 udaf?如果我只想传递汽车子结构,输入模式应该是什么。

你可以

,但UDAF的逻辑会有所不同。例如,如果您有两行:

val seq = Seq(cars(cars_schema("car1", "car2", "car3")), (cars(cars_schema("car1", "car2", "car3"))))
val rdd = spark.sparkContext.parallelize(seq)

这里的架构是

root
 |-- cars: struct (nullable = true)
 |    |-- car1: string (nullable = true)
 |    |-- car2: string (nullable = true)
 |    |-- car3: string (nullable = true)

然后,如果您尝试调用聚合:

val df = seq.toDF
df.agg(agg0(col("cars")))

您必须更改 UDAF 输入架构,如下所示:

val carsSchema =
    StructType(List(StructField("car1", StringType, true), StructField("car2", StringType, true), StructField("car3", StringType, true)))

在你的 UDAF 男孩中,你必须处理这个模式来改变输入模式:

override def inputSchema: StructType = StructType(StructField("input", carsSchema) :: Nil)

在更新方法中,您必须处理输入行的格式:

override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
  val i = input.getAs[Array[Array[String]]](0)
  // i here would be [car1,car2,car3],  an array of strings
  buffer(0) = ???
}

从这里,您可以转换 i 以更新缓冲区并完成合并和计算函数。

相关内容

  • 没有找到相关文章

最新更新