Spark-递归功能,因为UDF会生成异常



我正在使用dataFrames,其中元素的模式类似于:

root
 |-- NPAData: struct (nullable = true)
 |    |-- NPADetails: struct (nullable = true)
 |    |    |-- location: string (nullable = true)
 |    |    |-- manager: string (nullable = true)
 |    |-- service: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- serviceName: string (nullable = true)
 |    |    |    |-- serviceCode: string (nullable = true) 
 |-- NPAHeader: struct (nullable = true)
 |    |    |-- npaNumber: string (nullable = true)
 |    |    |-- date: string (nullable = true)

在我的数据框架中,我想对具有相同NPAHeader.code的所有元素进行分组,因此,我使用以下行:

val groupedNpa = orderedNpa.groupBy($"NPAHeader.code" ).agg(collect_list(struct($"NPAData",$"NPAHeader")).as("npa"))

之后,我有一个带有以下模式的数据框:

StructType(StructField(npaNumber,StringType,true), StructField(npa,ArrayType(StructType(StructField(NPAData...)))))

每行的一个示例与以下内容相似:

[1234,WrappedArray([npaNew,npaOlder,...npaOldest])]

现在,我想要的是生成另一个数据框,其中只有包装中的一个元素之一,所以我想要一个类似的输出:

[1234,npaNew]

注意:包裹段中所选的元素是在整个包装中迭代后匹配副本逻辑的元素。但是为了简化这个问题,我将始终接收包装的最后一个元素(迭代遍布所有内容)。

要这样做,我想定义一个重复的UDF

import org.apache.spark.sql.functions.udf
def returnRow(elementList : Row)(index:Int): Row = {
  val dif = elementList.size - index
  val row :Row = dif match{
    case 0 => elementList.getAs[Row](index)
    case _ => returnRow(elementList)(index + 1) 
  }
  row
} 
val returnRow_udf = udf(returnRow _)

groupedNpa.map{row => (row.getAs[String]("npaNumber"),returnRow_udf(groupedNpa("npa")(0)))}

,但我在地图中遇到以下错误:

线程" main" java.lang.unsupportedoperationException中的例外: 类型INT =>单位的模式不支持

我在做什么错?

顺便说一句,我不确定我是否使用groupedNpa("npa")正确传递了npa列。我正在将包裹段作为一行,因为我不知道如何通过 Array[Row]迭代( get(index)方法不存在在数组[row]中)

tl; dr 只需使用如何选择每个组的第一行中描述的一种方法?

如果要使用复杂的逻辑,并且返回Row可以跳过SQL API并使用groupByKey

val f: (String, Iterator[org.apache.spark.sql.Row]) => Row
val encoder: Encoder 
df.groupByKey(_.getAs[String]("NPAHeader.code")).mapGroups(f)(encoder)

或更好:

val g: (Row, Row) => Row
df.groupByKey(_.getAs[String]("NPAHeader.code")).reduceGroups(g)

其中 encoder是有效的 RowEncoder(试图将数据框架行映射到更新的行时的编码错误)。

您的代码在多种方面有故障:

  • groupBy不能保证值的顺序。所以:

    orderBy(...).groupBy(....).agg(collect_list(...))
    

    可以具有非确定性输出。如果您真的决定走这条路线,则应跳过orderBy并明确对收集的数组进行排序。

  • 您不能将咖喱功能传递给udf。您必须首先取消它,但是它需要不同的参数(请参见下面的示例)。

  • 如果可以的话,这可能是调用它的正确方法(请注意,您省略了第二个参数):

    returnRow_udf(groupedNpa("npa")(0))
    

    更糟的是,您将其称为map中,其中udfs根本不适用。

  • udf无法返回Row。它必须返回外部Scala类型。

  • array<struct>的外部表示是Seq[Row]。您不能只用Row代替它。
  • SQL数组可以通过apply索引访问:

    df.select($"array"(size($"array") - 1))
    

    ,但由于非确定性而不是正确的方法。您可以应用sort_array,但正如一开始所指出的,有更有效的解决方案。

  • 令人惊讶的是,递归不是那么重要。您可以像这样设计功能:

    def size(i: Int=0)(xs: Seq[Any]): Int = xs match {
      case Seq() => i
      case null => i
      case Seq(h, t @ _*) => size(i + 1)(t)
    }
    val size_ = udf(size() _)
    

    它可以正常工作:

    Seq((1, Seq("a", "b", "c"))).toDF("id", "array")
      .select(size_($"array"))
    

    虽然递归是一个过度杀伤,但如果您只能在Seq上迭代。

相关内容

  • 没有找到相关文章

最新更新