在 EMR 上读/写红移时出现 Spark 2.1.1 错误



正在尝试从红移读取/写入(s3中的数据)。但是在访问数据框时出现奇怪的错误。我可以看到数据框正在创建并且它能够访问数据,因为它输出了表的列名

scala> :require /home/hadoop/spark-redshift_2.10-2.0.1.jar
Added '/home/hadoop/spark-redshift_2.10-2.0.1.jar' to classpath.
scala> :require /home/hadoop/RedshiftJDBC41-1.2.12.1017.jar
Added '/home/hadoop/RedshiftJDBC41-1.2.12.1017.jar' to classpath.
scala> :require /home/hadoop/spark-avro_2.11-3.2.0.jar
Added '/home/hadoop/spark-avro_2.11-3.2.0.jar' to classpath.
scala>   val read_data = (spark.read
|     .format("com.databricks.spark.redshift")
|     .option("url", "jdbc:redshift://redshifthost/schema?user=admin&password=password")
|     .option("query", "SELECT * FROM schema.table LIMIT 1")
|     .option("tempdir", tempS3Dir)
|     .option("forward_spark_s3_credentials",true)
|     .load())
read_data: org.apache.spark.sql.DataFrame = [aid: int, uid: int ... 3 more fields]
scala> read_data.count()

java.lang.ClassCastException:无法分配实例 scala.collection.immutable.list$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of 类型 scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133) 在 java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305) 在 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2251) 在 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) 在java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) 在 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) 在 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) 在 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) 在 scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) 在java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136) 在 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) 在 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) 在 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) 在 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) 在 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) 在 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) 在 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) 在 scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) 在 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136) 在 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) 在 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) 在 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) 在 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) 在 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) 在java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) 在 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) 在 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) 在 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) 在 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:85) 在 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

驱动程序堆栈跟踪:在 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1505) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1493) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1492) 在 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 在 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1492) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:803) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:803) 在斯卡拉。Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:803) 在org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1720) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1675) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1664) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:629) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.collect(RDD.scala:935) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:275) 在 org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2386) 在 org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2788) 在 org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2385) 在 org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2392) 在 org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2420) 在 org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2419) at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2801) at org.apache.spark.sql.Dataset.count(Dataset.scala:2419)

问题在于我如何导入软件包或我正在使用的软件包的版本。下面的安装软件包的方式就像一个魅力

./bin/spark-shell --packages com.databricks:spark-avro_2.11:3.2.0,com.databricks:spark-redshift_2.11:2.0.1,com.databricks:spark-csv_2.11:1.5.0 --jars /home/hadoop/RedshiftJDBC41-1.2.12.1017.jar