我正在我的本地机器上测试ElasticSearch和Spark的集成,使用ElasticSearch中加载的一些测试数据。
val sparkConf = new SparkConf().setAppName("Test").setMaster("local")
val sc = new SparkContext(sparkConf)
val conf = new JobConf()
conf.set("spark.serializer", classOf[KryoSerializer].getName)
conf.set("es.nodes", "localhost:9200")
conf.set("es.resource", "bank/account")
conf.set("es.query", "?q=firstname:Daniel")
val esRDD = sc.hadoopRDD(conf,classOf[EsInputFormat[Text, MapWritable]],
classOf[Text], classOf[MapWritable])
esRDD.first()
esRDD.collect()
代码运行良好,并成功返回正确的结果esRDD.first ()
但是,esRDD.collect()将生成异常:
java.io.NotSerializableException: org.apache.hadoop.io.Text
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:71)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
我相信这与这里提到的问题有关http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html所以我添加了这一行
conf.set("spark.serializer", classOf[KryoSerializer].getName)
我应该做些什么让它工作吗?谢谢你
更新:解决了序列化设置问题。通过使用
sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
不是conf.set("spark.serializer", classOf[KryoSerializer].getName)
现在有另一个这个数据集中有1000条不同的记录
esRDD.count()
返回1000没有问题,但是
esRDD.distinct().count()
返回5 !如果我打印记录
esRDD.foreach(println)
正确打印出1000条记录。但是如果我使用collect或take
esRDD.collect().foreach(println)
esRDD.take(10).foreach(println)
它将打印duplicate记录,并且确实只有5个UNIQUE记录显示,这似乎是整个数据集的随机子集-它不是前5条记录。如果我保存RDD并读取它
esRDD.saveAsTextFile("spark-output")
val esRDD2 = sc.textFile("spark-output")
esRDD2.distinct().count()
esRDD2.collect().foreach(println)
esRDD2.take(10).foreach(println)
esRDD2的行为与预期一致。我想知道是否有一个bug,或者一些我不理解的关于收集/拿走的行为。还是因为我在本地运行。默认情况下,Spark RDD似乎使用5个分区,如"Spark -output"文件的part-xxxx文件数量所示。这可能就是esRDD.collect()和esRDD.distinct()返回5个唯一记录的原因,而不是其他随机数。但这还是不对。
您应该使用以下代码进行初始化:
val sparkConf = new SparkConf().setAppName("Test").setMaster("local").set("spark.serializer", classOf[KryoSerializer].getName)
val sc = new SparkContext(sparkConf)
val conf = new JobConf()
conf.set("es.nodes", "localhost:9200")
conf.set("es.resource", "bank/account")
conf.set("es.query", "?q=firstname:Daniel")
你可以试试
val spark = new SparkConf()
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.set("es.nodes",localhost)
.set("es.port","9200")
.appName("ES")
.master("local[*]")
val data = spark.read
.format("org.elasticsearch.spark.sql")
.option("es.query", "?q=firstname:Daniel")")
.load("bank/account").rdd
data.first()
data.collect()