我正在使用spark-sql的DataFrame来实现一个通用的数据集成组件。基本思想,用户通过命名字段并将其与简单的SQL片段(可以出现在Select子句中的片段)映射来配置字段,组件添加此列并将它们分组到结构字段中(使用DSL列中的结构)。
稍后的处理会占用其中一些结构字段并将它们分组到一个数组中,此时我遇到了一个与其中一个字段在一个元组中可为空而在另一个元组中不可为空相关的问题。
由于字段分组在一个结构中,我能够提取结构类型,修改它并使用 Column.cast 方法将其应用回整个元组,我不确定这种方法是否适用于顶级字段(顺便说一句,SQL 强制转换语法不允许指定字段的可空性)。
我的问题是,有没有更好的方法来实现这一目标? 类似于 nullable() 函数的东西,可以应用于表达式以将其标记为可为空,类似于强制转换的工作方式。
示例代码:
val df = (1 to 8).map(x => (x,x+1)).toDF("x","y")
val df6 = df.select(
functions.struct( $"x" + 1 as "x1", $"y" + 1 as "y1" ) as "struct1",
functions.struct( $"x" + 1 as "x1", functions.lit(null).cast( DataTypes.IntegerType ) as "y1" ) as "struct2"
)
val df7 = df6.select( functions.array($"struct1", $"struct2") as "arr" )
此操作失败,并出现以下异常:
由于数据类型不匹配,无法解析"数组(结构 1,结构 2)": 函数数组的输入应该都是相同的类型,但它是 [结构,结构]; org.apache.spark.sql.AnalysisException: 无法解析 由于数据类型不匹配而导致的"array(struct1,struct2)":函数的输入 数组应该都是相同的类型,但它是 [struct, 结构];
修复程序如下所示:
//val df7 = df6.select( functions.array($"struct1", $"struct2") as "arr" )
val df7 = df6.select( functions.array($"struct1" cast df6.schema("struct2").dataType, $"struct2" ) as "arr" )
创建Option[Int]
的udf
使其更干净:
val optionInt = udf[Option[Int],Int](i => Option(i))
然后,当您为struct1
创建y1
时,您需要使用optionInt($"y" + 1)
。其他所有内容保持不变(尽管为简洁起见进行了编辑)。
val df6 = df.select(
struct($"x" + 1 as "x1", optionInt($"y" + 1) as "y1" ) as "struct1",
struct($"x" + 1 as "x1", lit(null).cast(IntegerType) as "y1" ) as "struct2"
)
然后df6.select(array($"struct1", $"struct2") as "arr" )
工作正常。