如何修复java.lang.ClassCastException:无法将scala.collection.immutab



这个错误是最难跟踪的。我不知道发生了什么事。我在我的定位机上运行一个Spark集群。所以整个spark集群在一个主机127.0.0.1下,我在独立模式下运行

JavaPairRDD<byte[], Iterable<CassandraRow>> cassandraRowsRDD= javaFunctions(sc).cassandraTable("test", "hello" )
   .select("rowkey", "col1", "col2", "col3",  )
   .spanBy(new Function<CassandraRow, byte[]>() {
        @Override
        public byte[] call(CassandraRow v1) {
            return v1.getBytes("rowkey").array();
        }
    }, byte[].class);
Iterable<Tuple2<byte[], Iterable<CassandraRow>>> listOftuples = cassandraRowsRDD.collect(); //ERROR HAPPENS HERE
Tuple2<byte[], Iterable<CassandraRow>> tuple = listOftuples.iterator().next();
byte[] partitionKey = tuple._1();
for(CassandraRow cassandraRow: tuple._2()) {
    System.out.println("************START************");
    System.out.println(new String(partitionKey));
    System.out.println("************END************");
}

这个错误是最难追踪的。这显然发生在cassandraRowsRDD.collect(),我不知道为什么?

16/10/09 23:36:21 ERROR Executor: Exception in task 2.3 in stage 0.0 (TID 21)
java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
    at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2006)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

以下是我使用的版本

Scala code runner version 2.11.8  // when I run scala -version or even ./spark-shell

compile group: 'org.apache.spark' name: 'spark-core_2.11' version: '2.0.0'
compile group: 'org.apache.spark' name: 'spark-streaming_2.11' version: '2.0.0'
compile group: 'org.apache.spark' name: 'spark-sql_2.11' version: '2.0.0'
compile group: 'com.datastax.spark' name: 'spark-cassandra-connector_2.11' version: '2.0.0-M3': 

我的gradle文件看起来像这样,在引入了一个叫做"提供"的东西之后,它实际上似乎不存在,但谷歌说要创建一个,所以我的构建。Gradle是这样的

group 'com.company'
version '1.0-SNAPSHOT'
apply plugin: 'java'
apply plugin: 'idea'
repositories {
    mavenCentral()
    mavenLocal()
}
configurations {
    provided
}
sourceSets {
    main {
        compileClasspath += configurations.provided
        test.compileClasspath += configurations.provided
        test.runtimeClasspath += configurations.provided
    }
}
idea {
    module {
        scopes.PROVIDED.plus += [ configurations.provided ]
    }
}
dependencies {
    compile 'org.slf4j:slf4j-log4j12:1.7.12'
    provided group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.0.0'
    provided group: 'org.apache.spark', name: 'spark-streaming_2.11', version: '2.0.0'
    provided group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.0.0'
    provided group: 'com.datastax.spark', name: 'spark-cassandra-connector_2.11', version: '2.0.0-M3'
}

jar {
    from { configurations.provided.collect { it.isDirectory() ? it : zipTree(it) } }
   // with jar
    from sourceSets.test.output
    manifest {
        attributes 'Main-Class': "com.company.batchprocessing.Hello"
    }
    exclude 'META-INF/.RSA', 'META-INF/.SF', 'META-INF/*.DSA'
    zip64 true
}

我有同样的问题,可以通过使用

将应用程序的jar添加到spark的类路径来解决这个问题
spark = SparkSession.builder()
        .appName("Foo")
        .config("spark.jars", "target/scala-2.11/foo_2.11-0.1.jar")

我遇到了同样的异常,并深入研究了多个相关的Jiras(9219, 12675, 18075)。

我认为异常的名称是令人困惑的,真正的问题是spark集群和驱动应用程序之间的不一致的环境设置

例如,我在conf/spark-defaults.conf中使用以下行启动我的Spark集群:

spark.master                     spark://master:7077

当我用一行启动我的驱动程序时(即使程序是用spark-submit启动的):

sparkSession.master("spark://<master ip>:7077")

,其中<master ip>是节点master的正确IP地址,但是程序会因为这个简单的不一致而失败。

因此,我建议所有驱动程序应用程序都以spark-submit启动,并且不要复制驱动程序代码中的任何配置(除非您需要覆盖某些配置)。也就是说,让spark-submit在运行的Spark集群中以相同的方式设置您的环境。

在我的情况下,我不得不添加spark-avro jar(我把它放在/lib文件夹旁边的主jar):

SparkSession spark = SparkSession.builder().appName("myapp").getOrCreate();
...
spark.sparkContext().addJar("lib/spark-avro_2.11-4.0.0.jar");

call()方法应该像下面这样返回byte[]。

@Override
public byte[] call(CassandraRow v1) {
  return v1.getBytes("rowkey").array();
}

如果你仍然遇到这个问题,那么检查Jira中提到的依赖的版本https://issues.apache.org/jira/browse/SPARK-9219

检查你的代码-在Intellij:分析…->检查代码。如果您已经弃用了与序列化相关的方法,请修复它。或者干脆尝试减少Spark到Scala的版本。在我的例子中,我将Scala版本降低到2.10,并且一切正常。

我有同样的问题,而在eclipse运行我的工作在spark集群的一个节点,这是ubuntu盒子。我将UDF创建为一个单独的java类。当在本地运行spark时,一切都很好,但转向yarn会抛出与问题相同的异常。

我通过将生成的类的路径放在spark classpath中来解决这个问题,其中包括类似Holger Brandl建议的UDF类。

我为classpath创建了一个变量:
String cpVar = "..../target/classes"

并添加到spark作为配置变量:

.config("spark.driver.extraClassPath", cpVar)
.config("spark.executorEnv.CLASSPATH", cpVar)
编辑:

将路径添加到classpath中只解决了驱动节点的问题,集群中的其他节点仍然可能有相同的错误。我得到的最终解决方案是在每次构建后将生成的类放入hdfs,并将类路径设置为spark的hdfs文件夹,如下所示。

sparkSession.sparkContext().addJar("hdfs:///user/.../classes");

请参考TheMP的答案

相关内容

  • 没有找到相关文章

最新更新