如何用java对spark数据集中的可选字段进行编码



我不想对数据集中使用的类的字段使用null值。我尝试使用scalaOption和javaOptional,但失败了:

@AllArgsConstructor // lombok
@NoArgsConstructor  // mutable type is required in java :(
@Data               // see https://stackoverflow.com/q/59609933/1206998
public static class TestClass {
String id;
Option<Integer> optionalInt;
}
@Test
public void testDatasetWithOptionField(){
Dataset<TestClass> ds = spark.createDataset(Arrays.asList(
new TestClass("item 1", Option.apply(1)),
new TestClass("item .", Option.empty())
), Encoders.bean(TestClass.class));
ds.collectAsList().forEach(x -> System.out.println("Found " + x));
}

运行时出现故障,并显示消息File 'generated.java', Line 77, Column 47: Cannot instantiate abstract "scala.Option"


问题:有没有一种方法可以使用java在数据集中编码不带null的可选字段?

次要问题:顺便说一句,我在scala中也没有使用太多数据集,你能验证在scala中将包含Option字段的case类编码是否真的可能吗?


注意:这用于中间数据集,即既不读也不写的东西(但用于内部串行化(

这在Scala中非常简单。

Scala实现

import org.apache.spark.sql.{Encoders, SparkSession}
object Test {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.appName("Stack-scala")
.master("local[2]")
.getOrCreate()
val ds = spark.createDataset(Seq(
TestClass("Item 1", Some(1)),
TestClass("Item 2", None)
))( Encoders.product[TestClass])
ds.collectAsList().forEach(println)
spark.stop()
}
case class TestClass(
id: String,
optionalInt: Option[Int] )
}

Java

Java中提供了各种Option类。然而,它们都不是开箱即用的。

  1. java.util.Optional:不可序列化
  2. scala.Option->可序列化但抽象,因此当CodeGenerator生成以下代码时,它会失败
/* 081 */         // initializejavabean(newInstance(class scala.Option))
/* 082 */         final scala.Option value_9 = false ?
/* 083 */         null : new scala.Option();  // ---> Such initialization is not possible for abstract classes
/* 084 */         scala.Option javaBean_1 = value_9;
  1. org.apache.spark.api.java.Optional->Spark对Optional的实现是可序列化的,但具有私有构造函数。因此,它失败了,并出现错误:没有为零实际参数找到适用的构造函数/方法。由于这是一个final类,因此不可能对此进行扩展
/* 081 */         // initializejavabean(newInstance(class org.apache.spark.api.java.Optional))
/* 082 */         final org.apache.spark.api.java.Optional value_9 = false ?
/* 083 */         null : new org.apache.spark.api.java.Optional();
/* 084 */         org.apache.spark.api.java.Optional javaBean_1 = value_9;
/* 085 */         if (!false) {

一个选项是在数据类中使用普通的Java Optionals,然后使用Kryo作为序列化程序。

Encoder en = Encoders.kryo(TestClass.class);
Dataset<TestClass> ds = spark.createDataset(Arrays.asList(
new TestClass("item 1", Optional.of(1)),
new TestClass("item .", Optional.empty())
), en);
ds.collectAsList().forEach(x -> System.out.println("Found " + x));

输出:

Found TestClass(id=item 1, optionalInt=Optional[1])
Found TestClass(id=item ., optionalInt=Optional.empty)

使用Kryo时有一个缺点:此编码器以二进制格式编码:

ds.printSchema();
ds.show(false);

打印

root
|-- value: binary (nullable = true)
+-------------------------------------------------------------------------------------------------------+
|value                                                                                                  |
+-------------------------------------------------------------------------------------------------------+
|[01 00 4A 61 76 61 53 74 61 72 74 65 72 24 54 65 73 74 43 6C 61 73 F3 01 01 69 74 65 6D 20 B1 01 02 02]|
|[01 00 4A 61 76 61 53 74 61 72 74 65 72 24 54 65 73 74 43 6C 61 73 F3 01 01 69 74 65 6D 20 AE 01 00]   |
+-------------------------------------------------------------------------------------------------------+

一个基于udf的解决方案来获得用Kryo编码的数据集的正常输出列,描述了这个答案。


也许有点偏离主题,但找到长期解决方案的一个开始可能是查看JavaTypeInference的代码。ExpressionEncoder.javaBean使用方法serializerFor和反序列化器for来创建Java bean编码器的序列化器和反序列化程序部分。

在该模式匹配块中

typeToken.getRawType match {
case c if c == classOf[String] => createSerializerForString(inputObject)
case c if c == classOf[java.time.Instant] => createSerializerForJavaInstant(inputObject)
case c if c == classOf[java.sql.Timestamp] => createSerializerForSqlTimestamp(inputObject)
case c if c == classOf[java.time.LocalDate] => createSerializerForJavaLocalDate(inputObject)
case c if c == classOf[java.sql.Date] => createSerializerForSqlDate(inputObject)
[...]

缺少对CCD_ 9的处理。它可能可以添加到这里,也可以添加到相应的反序列化方法中。这将允许Javabean具有Optional类型的属性。

最新更新