Window表达式上的Spark单元测试错误



我目前正在IntelliJ、DataFrameSuiteBase下使用Spark的单元测试和SharedParkContext。

我在Windows下,只要Spark操作不使用org.apache.spark.sql.expressions.Window对象。

例如:

val accu = anosGroupees.select($"col1", $"avg(col2)",
sum($"avg(col2)").over(Window.orderBy("col3").rowsBetween(Long.MinValue,
-1)).as("mycolumn"))

错误为:

Task not serializable
org.apache.spark.SparkException: Task not serializable
        at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
        at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:707)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:706)
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706)
        at org.apache.spark.sql.execution.Window.doExecute(Window.scala:245)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
        at
org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
        at
org.apache.spark.sql.execution.Coalesce.doExecute(basicOperators.scala:250)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
        at
org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
        at
org.apache.spark.sql.execution.joins.BroadcastHashJoin$$anonfun$broadcastFuture$1$$anonfun$apply$1.apply(BroadcastHashJoin.scala:82)
        at
org.apache.spark.sql.execution.joins.BroadcastHashJoin$$anonfun$broadcastFuture$1$$anonfun$apply$1.apply(BroadcastHashJoin.scala:79)
        at
org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:100)
        at
org.apache.spark.sql.execution.joins.BroadcastHashJoin$$anonfun$broadcastFuture$1.apply(BroadcastHashJoin.scala:79)
        at
org.apache.spark.sql.execution.joins.BroadcastHashJoin$$anonfun$broadcastFuture$1.apply(BroadcastHashJoin.scala:79)
        at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
        at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException:
org.apache.hive.com.esotericsoftware.kryo.Kryo cannot be cast to
com.esotericsoftware.kryo.Kryo
        at
org.apache.spark.sql.hive.HiveShim$HiveFunctionWrapper.serializePlan(HiveShim.scala:155)
        at
org.apache.spark.sql.hive.HiveShim$HiveFunctionWrapper.writeExternal(HiveShim.scala:168)
        at
java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459)
        at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
        at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
        at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
        at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
        ... 39 more

你觉得怎么样?是否存在类路径问题?如果我在一个yarn集群上运行这个代码,就没有问题。它只发生过在一个maven项目下的Windows上。pom中的依赖项是:

<dependencies>
    <dependency>
        <groupId>ai.h2o</groupId>
        <artifactId>sparkling-water-core_2.10</artifactId>
        <version>1.5.10</version>
    </dependency>
    <dependency>
        <groupId>ai.h2o</groupId>
        <artifactId>h2o-scala_2.10</artifactId>
        <version>3.8.0.3</version>
    </dependency>
    <dependency>
        <groupId>ai.h2o</groupId>
        <artifactId>h2o-app</artifactId>
        <version>3.8.0.3</version>
    </dependency>
    <dependency>
        <groupId>com.google.guava</groupId>
        <artifactId>guava</artifactId>
        <version>15.0</version>
    </dependency>
    <dependency>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.2.2</version>
    </dependency>
    <dependency>
        <groupId>org.scalatest</groupId>
        <artifactId>scalatest_2.10</artifactId>
        <version>2.2.6</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.7.14</version>
    </dependency>
    <dependency>
        <groupId>com.holdenkarau</groupId>
        <artifactId>spark-testing-base_2.10</artifactId>
        <version>1.5.2_0.3.1</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.6.0</version>
    </dependency>
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-hadoop</artifactId>
        <version>2.1.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-mllib_2.10</artifactId>
        <version>1.6.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>1.6.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-hive_2.10</artifactId>
        <version>1.6.0</version>
    </dependency>
    <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-csv_2.10</artifactId>
        <version>1.3.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-csv</artifactId>
        <version>1.2</version>
    </dependency>
    <dependency>
        <groupId>joda-time</groupId>
        <artifactId>joda-time</artifactId>
        <version>2.4</version>
    </dependency>
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-spark_2.10</artifactId>
        <version>2.1.2</version>
    </dependency>
</dependencies>

谨致问候,Brice

根本原因是CLassCastException:org.apache.hive.com.esotericsoftware.kryo.kryo无法强制转换为com.深奥软件.kryo.kryo.

我在Java中也遇到过同样的问题。

Hive Exec jar封装了一个Kryo实现(带阴影)。在Spark Hive jar中,HiveShim类链接到另一个Kryo类。UnitTest中的一个解决方法是重新分配runtimeSerializationKryo:

// For JUnit4 you can use BeforeClass
@BeforeClass
public static void otherKryoImpl() {
    // com.esotericsoftware.kryo.Kryo << Not the: ORG.APACHE.HIVE Shaded version.
    // In the class HiveShim (spark-hive_2.10) the com.esotericsoftware.kryo.Kryo is expected
    // org.apache.hadoop.hive.ql.exec.Utilities
    Utilities.runtimeSerializationKryo = new ThreadLocal() { // Not typed to avoid compiler warning.
        @Override
        protected synchronized Kryo initialValue() {
            Kryo kryo = new Kryo();
            // copy stuff from original implementation...
            return kryo;
        };
    };
}

我也遇到了同样的问题。

我修复了它:-)

它解决了类路径问题;只需将spark-lib放在额外驱动程序(执行器)路径的第一个位置

最新更新