在运行Spark应用程序时,我在催化剂深处遇到了错误。
例如:
java.lang.RuntimeException: scala.MatchError: LongType (of class org.apache.spark.sql.types.LongType$)
org.apache.spark.sql.catalyst.expressions.Cast.org$apache$spark$sql$catalyst$expressions$Cast$$nullSafeCastFunction(Cast.scala:637)
org.apache.spark.sql.catalyst.expressions.Cast.doGenCode(Cast.scala:625)
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
scala.Option.getOrElse(Option.scala:121)
org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
我在火花计划中将其缩小到以下内容:
Project [if (isnull(_rawTime#348L)) null else UDF(toTime(_rawTime#348L)) AS _time#438,
(请注意,我无法控制架构是否为 null,因为我从 Spark HBase 连接器获取此基础数据帧)
其中toTime
是 UDF 需要很长时间并生成时间戳。似乎催化剂无法匹配LongType
即使匹配语句具有:
case LongType => castToLongCode(from, ctx)
有趣的是,当我第一次运行它时,它工作正常。在第二次运行时,它有这个问题。
请注意,这是通过 apache Livy 运行的,因此执行之间的底层 Spark 会话应该相同。
我在工作开始时放置了以下代码。
logger.info("----------")
logger.info(LongType + " " + System.identityHashCode(LongType))
logger.info(DataTypes.LongType + " " + System.identityHashCode(DataTypes.LongType))
logger.info("Equal " + (DataTypes.LongType == LongType))
logger.info("----------")
然后运行它,我看到:
first run:
----------
LongType 1044985410
LongType 1044985410
Equal true
----------
second run:
----------
LongType 355475697
LongType 1044985410
Equal false
----------
您可以在运行 2 上看到,对 LongType 的基于对象的调用与第一次运行时的标识不同。
Spark的评论建议人们使用DataTypes中的单例。 例如。DataTypes.LongType
这是有道理的,因为它们似乎保持不变。但是,Spark自己的代码使用非单例。
长型定义为
/**
* @since 1.3.0
*/
@InterfaceStability.Stable
case object LongType extends LongType
虽然DataTypes.LongType
是
public static final DataType LongType = LongType$.MODULE$;
它指的是前者(案例对象)。单例保持不变是有道理的。事实上,火花代码说Please use the singleton
DataTypes.LongType.
。尽管内部 Spark 代码的负载不会这样做。 对我来说,这感觉就像一个错误。
Spark 中的 Scala 代码编译良好,然后随着类型的突然身份更改而失败,这似乎很奇怪。
所以我的问题是:
- 在 Spark 中使用
DataType
的建议是什么?我应该使用单例还是非单例? - 什么可能导致这个身份在我下面发生变化?
我已经解决了这个问题。
基本上所有的数据类型实例在 Scala 中都定义为:
* @since 1.3.0
*/
@InterfaceStability.Stable
case object LongType extends LongType
但。。。在许多地方,Spark使用Java代码,使用单例获取数据类型:
* Gets the LongType object.
*/
public static final DataType LongType = LongType$.MODULE$;
LongType$.MODULE$;
是如何从java land调用case对象。
但是我正在使用 Kryo 将DataType
序列化为 Livy,而 Kryo 正在内部重新初始化LongType$.MODULE$;
。 在 Scala 中,当你得到一个案例 Object 时,你得到的引用不是绑定到创建的第一个实例,而是绑定到最后一个创建的实例。
所以时间线是:
- 时间 0:
DataTypes.LongType
的引用为 1,LongType
的引用为 1 也。(其中ref
仅表示参考) - 时间 1:Kryo 反序列化,因此重新实例化对象。但是,单例 DataTypes.LongType 指向第一个实例。即
DataTypes.LongType
的参考文献为 1,LongType
的参考
文献为 2 - 时间>=2:混乱随之而来 - 数据类型无法通过相等性检查。
解决方案是不要以这种方式将大小写对象传递给 Kryo。可能是由于某种原因,我们没有正确使用Kryo,或者我们需要使用twitter/chill。