编码后无法对自定义类型进行操作?Spark数据集



假设您有这个(编码自定义类型的解决方案来自这个线程(:

// assume we handle custom type
class MyObj(val i: Int, val j: String)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
val ds = spark.createDataset(Seq(new MyObj(1, "a"),new MyObj(2, "b"),new MyObj(3, "c")))

当做ds.show时,我得到了:

+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

我理解这是因为内容被编码到内部Spark SQL二进制表示中。但是我怎样才能像这样显示解码后的内容呢?

+---+---+
| _1| _2|
+---+---+
|  1|  a|
|  2|  b|
|  3|  c|
+---+---+

更新1

显示内容并不是最大的问题,更重要的是,在处理数据集时,它可能会导致问题,例如:

// continue with the above code
val ds2 = spark.createDataset(Seq(new MyObj(2, "a"),new MyObj(6, "b"),new MyObj(5, "c"))) 
ds.joinWith(ds2, ds("i") === ds2("i"), "inner") 
// this gives a Runtime error: org.apache.spark.sql.AnalysisException: Cannot resolve column name "i" among (value); 

这是否意味着,kryo编码类型不能像joinWith那样方便地进行操作?

我们如何在Dataset上处理自定义类型
如果我们在编码后无法处理它,那么这个kryo编码解决方案对自定义类型有什么意义?!

(下面@jacek提供的解决方案对于case class类型很好,但它仍然无法解码自定义类型(

以下内容对我有效,但似乎使用高级API来完成低级(反序列化(工作。

这并不是说应该这样做,而是表明这是可能的。

我不知道为什么KryoDeserializer不将字节反序列化到字节来自的对象。就是这样。

您的类定义和我的类定义之间的一个主要区别是这个case,它让我使用以下技巧。再说一遍,不知道为什么它会使它成为可能。

scala> println(spark.version)
3.0.1
// Note that case keyword
case class MyObj(val i: Int, val j: String)
import org.apache.spark.sql.Encoders
implicit val myObjEncoder = Encoders.kryo[MyObj]
// myObjEncoder: org.apache.spark.sql.Encoder[MyObj] = class[value[0]: binary]
val ds = (Seq(new MyObj(1, "a"),new MyObj(2, "b"),new MyObj(3, "c"))).toDS
// the Kryo deserializer gives bytes
scala> ds.printSchema
root
|-- value: binary (nullable = true)
scala> :type sc
org.apache.spark.SparkContext
// Let's deserialize the bytes into an object
import org.apache.spark.serializer.KryoSerializer
val ks = new KryoSerializer(sc.getConf)
// that begs for a generic UDF
val deserMyObj = udf { value: Array[Byte] => 
import java.nio.ByteBuffer
ks.newInstance.deserialize(ByteBuffer.wrap(value)).asInstanceOf[MyObj] }
val solution = ds.select(deserMyObj('value) as "result").select($"result.*")
scala> solution.show
+---+---+
|  i|  j|
+---+---+
|  1|  a|
|  2|  b|
|  3|  c|
+---+---+

最新更新