Spark UDAF:Java.lang.internalerror:畸形的类名称



我正在使用CDH 5.5.2发出的Spark 1.5.0。我从2.10.4切换到Scala 2.10.5。我正在为UDAF使用以下代码。这是某种程度上的字符串与UTF8String问题吗?如果是,任何帮助将不胜感激。

object GroupConcat extends UserDefinedAggregateFunction {
    def inputSchema = new StructType().add("x", StringType)
    def bufferSchema = new StructType().add("buff", ArrayType(StringType))
    def dataType = StringType
    def deterministic = true 
    def initialize(buffer: MutableAggregationBuffer) = {
      buffer.update(0, ArrayBuffer.empty[String])
    }
    def update(buffer: MutableAggregationBuffer, input: Row) = {
      if (!input.isNullAt(0)) 
        buffer.update(0, buffer.getSeq[String](0) :+ input.getString(0))
    }
    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
      buffer1.update(0, buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0))
    }
    def evaluate(buffer: Row) = UTF8String.fromString(
      buffer.getSeq[String](0).mkString(","))
}

但是,我在运行时收到此错误消息:

Exception in thread "main" java.lang.InternalError: Malformed class name
at java.lang.Class.getSimpleName(Class.java:1190)
at org.apache.spark.sql.execution.aggregate.ScalaUDAF.toString(udaf.scala:464)
at java.lang.String.valueOf(String.java:2847)
at java.lang.StringBuilder.append(StringBuilder.java:128)
at scala.StringContext.standardInterpolator(StringContext.scala:122)
at scala.StringContext.s(StringContext.scala:90)
at org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression2.toString(interfaces.scala:96)
at org.apache.spark.sql.catalyst.expressions.Expression.prettyString(Expression.scala:174)
at org.apache.spark.sql.GroupedData$$anonfun$1.apply(GroupedData.scala:86)
at org.apache.spark.sql.GroupedData$$anonfun$1.apply(GroupedData.scala:80)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.GroupedData.toDF(GroupedData.scala:80)
at org.apache.spark.sql.GroupedData.agg(GroupedData.scala:227)

我收到了相同的例外,因为扩展了userDefinedAggregateFunction的对象在另一个函数中。

更改此内容:

object Driver {
    def main(args: Array[String]) {
        object GroupConcat extends UserDefinedAggregateFunction {
            ...
        }
    }
}

object Driver {
    def main(args: Array[String]) {
        ...
    }
    object GroupConcat extends UserDefinedAggregateFunction {
        ...
    }
}

我遇到了与我正在导入的软件包的冲突。如果要导入任何内容,请尝试在没有任何导入的火花壳上进行测试。

定义UDAF时,请检查返回的名称的样子。它应该像

FractionOfDayCoverage: org.apache.spark.sql.expressions.UserDefinedAggregateFunction{def dataType: org.apache.spark.sql.types.DoubleType.type; def evaluate(buffer: org.apache.spark.sql.Row): Double} = $anon$1@27506b6d

最终,$ anon $ 1@27506b6d是一个合理的名称。当我导入冲突的软件包时,返回的名称是3倍。这是一个示例:

$$$$bec6d1991b88c272b3efac29d720f546$$$$anon$1@6886601d

相关内容

  • 没有找到相关文章

最新更新