Spark 内置类型的 Spark 数据类型相等性问题



在运行Spark应用程序时,我在催化剂深处遇到了错误。

例如:

java.lang.RuntimeException: scala.MatchError: LongType (of class org.apache.spark.sql.types.LongType$)
org.apache.spark.sql.catalyst.expressions.Cast.org$apache$spark$sql$catalyst$expressions$Cast$$nullSafeCastFunction(Cast.scala:637)
org.apache.spark.sql.catalyst.expressions.Cast.doGenCode(Cast.scala:625)
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
scala.Option.getOrElse(Option.scala:121)
org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)

我在火花计划中将其缩小到以下内容:

Project [if (isnull(_rawTime#348L)) null else UDF(toTime(_rawTime#348L)) AS _time#438,

(请注意,我无法控制架构是否为 null,因为我从 Spark HBase 连接器获取此基础数据帧)

其中toTime是 UDF 需要很长时间并生成时间戳。似乎催化剂无法匹配LongType即使匹配语句具有:

case LongType => castToLongCode(from, ctx)

有趣的是,当我第一次运行它时,它工作正常。在第二次运行时,它有这个问题。

请注意,这是通过 apache Livy 运行的,因此执行之间的底层 Spark 会话应该相同。

我在工作开始时放置了以下代码。

logger.info("----------")
logger.info(LongType + " " + System.identityHashCode(LongType))
logger.info(DataTypes.LongType + " " + System.identityHashCode(DataTypes.LongType))
logger.info("Equal " + (DataTypes.LongType == LongType))
logger.info("----------")

然后运行它,我看到:

first run:
----------
LongType 1044985410
LongType 1044985410
Equal true
----------
second run:
----------
LongType 355475697
LongType 1044985410
Equal false
----------

您可以在运行 2 上看到,对 LongType 的基于对象的调用与第一次运行时的标识不同。

Spark的评论建议人们使用DataTypes中的单例。 例如。DataTypes.LongType这是有道理的,因为它们似乎保持不变。但是,Spark自己的代码使用非单例。

长型定义为

/**
* @since 1.3.0
*/
@InterfaceStability.Stable
case object LongType extends LongType

虽然DataTypes.LongType

public static final DataType LongType = LongType$.MODULE$;

它指的是前者(案例对象)。单例保持不变是有道理的。事实上,火花代码说Please use the singletonDataTypes.LongType.。尽管内部 Spark 代码的负载不会这样做。 对我来说,这感觉就像一个错误。

Spark 中的 Scala 代码编译良好,然后随着类型的突然身份更改而失败,这似乎很奇怪。

所以我的问题是:

  • 在 Spark 中使用DataType的建议是什么?我应该使用单例还是非单例?
  • 什么可能导致这个身份在我下面发生变化?

我已经解决了这个问题。

基本上所有的数据类型实例在 Scala 中都定义为:

* @since 1.3.0
*/
@InterfaceStability.Stable
case object LongType extends LongType

但。。。在许多地方,Spark使用Java代码,使用单例获取数据类型:

* Gets the LongType object.
*/
public static final DataType LongType = LongType$.MODULE$;

LongType$.MODULE$;是如何从java land调用case对象。

但是我正在使用 Kryo 将DataType序列化为 Livy,而 Kryo 正在内部重新初始化LongType$.MODULE$;。 在 Scala 中,当你得到一个案例 Object 时,你得到的引用不是绑定到创建的第一个实例,而是绑定到最后一个创建的实例。

所以时间线是:

  • 时间 0:DataTypes.LongType的引用为 1,LongType的引用为 1 也。(其中ref仅表示参考)
  • 时间 1:Kryo 反序列化,因此重新实例化对象。但是,单例 DataTypes.LongType 指向第一个实例。即DataTypes.LongType的参考文献为 1,LongType的参考
  • 文献为 2
  • 时间>=2:混乱随之而来 - 数据类型无法通过相等性检查。

解决方案是不要以这种方式将大小写对象传递给 Kryo。可能是由于某种原因,我们没有正确使用Kryo,或者我们需要使用twitter/chill。

相关内容

  • 没有找到相关文章

最新更新