为什么sc.cassandraTable("test","users").select("username")的map函数不能工作?



在spark-cassandra连接器的演示和安装cassandra/spark OSS堆栈之后,在spark-shell下,我尝试了以下片段:

sc.stop
val conf = new SparkConf(true)
      .set("spark.cassandra.connection.host", "172.21.0.131")
      .set("spark.cassandra.auth.username", "adminxx")
      .set("spark.cassandra.auth.password", "adminxx")
val sc = new SparkContext("172.21.0.131", "Cassandra Connector Test", conf)
val rdd = sc.cassandraTable("test", "users").select("username")

rdd的许多操作符可以很好地工作,例如:

rdd.first
rdd.count

但当我使用map:时

val result = rdd.map(x => 1) //just for simple
result: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[61] at map at <console>:32

然后,我运行:

result.first

我得到了以下错误:

15/12/11 15:09:00 WARN TaskSetManager: Lost task 0.0 in stage 31.0 (TID 104, 124.250.36.124): java.lang.ClassNotFoundException: 
$line346.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1

Caused by: java.lang.ClassNotFoundException: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:278)
    at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)

我不知道为什么我会犯这样的错误?任何建议都将不胜感激!

更新时间:根据@RussSpitzer对CassandraRdd.map(row=>row.getInt("id))的回答,发生了java.lang.ClassNotFoundException!,我通过以下错误解决了这个错误,我没有使用sc.stop并创建新的SparkContext,而是用选项启动spark-shell

bin/spark-shell -conf spark.cassandra.connection.host=172.21.0.131 --conf spark.cassandra.auth.username=adminxx --conf spark.cassandra.auth.password=adminxx

然后所有的步骤都是一样的,并且运行良好。

Russell Spitzer从spark-connector-user列表中得到的答案:

我敢肯定,这里的主要问题是用--jars启动一个上下文,然后杀死那个上下文,然后启动另一个上下文。试着简化你的代码,而不是设置所有的spark-conf选项,并创建一个像shell一样运行的新上下文。此外,类路径上需要的jar是连接器程序集jar,而不是要运行的Scala脚本的自定义构建。

./spark-shell --conf spark.casandra.connection.host=10.129.20.80 ...

您不需要修改ack.wait.timeoutexecutor.extraClasspath

Spark应用程序通常将编译后的代码作为jar文件发送给执行器。这样一来,map的函数就会出现在执行器上。

spark-shell的情况更为棘手。它必须以交互方式编译和广播每一行的代码。甚至连你在里面操作的课都没有。它创建了这些伪$$iwC$$类来解决这个问题。

通常情况下,这很好,但您可能遇到了spark-shell错误。您可以尝试通过将代码放在spark-shell:中的类中来解决它

object Obj { val mapper = { x: String => 1 } }
val result = rdd.map(Obj.mapper)

但是,将代码实现为应用程序可能是最安全的,而不仅仅是用spark-shell编写代码。

相关内容

  • 没有找到相关文章

最新更新