在Spark DataFrame中使用UDT时的异常



我正在尝试在Spark SQL中创建一个用户定义的类型,但是我收到:com.ubs.ged.risk.stdout.spark.spark.examplepointudt也无法将其施加到org.apache.spark.sql.types.structtype,即使使用了示例。有人做了这项工作吗?

我的代码:

test("udt serialisation") {
    val points = Seq(new ExamplePoint(1.3, 1.6), new ExamplePoint(1.3, 1.8))
    val df = SparkContextForStdout.context.parallelize(points).toDF()
}
@SQLUserDefinedType(udt = classOf[ExamplePointUDT]) 
case class ExamplePoint(val x: Double, val y: Double)
/**
 * User-defined type for [[ExamplePoint]].
 */
class ExamplePointUDT extends UserDefinedType[ExamplePoint] {
  override def sqlType: DataType = ArrayType(DoubleType, false)
  override def pyUDT: String = "pyspark.sql.tests.ExamplePointUDT"
  override def serialize(obj: Any): Seq[Double] = {
    obj match {
      case p: ExamplePoint =>
        Seq(p.x, p.y)
    }
  }
  override def deserialize(datum: Any): ExamplePoint = {
    datum match {
      case values: Seq[_] =>
        val xy = values.asInstanceOf[Seq[Double]]
        assert(xy.length == 2)
        new ExamplePoint(xy(0), xy(1))
      case values: util.ArrayList[_] =>
        val xy = values.asInstanceOf[util.ArrayList[Double]].asScala
        new ExamplePoint(xy(0), xy(1))
    }
  }
  override def userClass: Class[ExamplePoint] = classOf[ExamplePoint]
}

有用的stackstrace是:

com.ubs.ged.risk.stdout.spark.ExamplePointUDT cannot be cast to org.apache.spark.sql.types.StructType
java.lang.ClassCastException: com.ubs.ged.risk.stdout.spark.ExamplePointUDT cannot be cast to org.apache.spark.sql.types.StructType
    at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:316)
    at org.apache.spark.sql.SQLContext$implicits$.rddToDataFrameHolder(SQLContext.scala:254)

似乎需要在另一类内部使用UDT来工作(作为字段的类型)。一种直接使用它的解决方案是将其包裹到元组中:

  test("udt serialisation") {
    val points = Seq(new Tuple1(new ExamplePoint(1.3, 1.6)), new Tuple1(new ExamplePoint(1.3, 1.8)))
    val df = SparkContextForStdout.context.parallelize(points).toDF()
    df.collect().foreach(println(_))
  }

相关内容

  • 没有找到相关文章

最新更新