Spark flatmap给出了迭代器错误



如果我在JSONArray上应用flatMap到JSONObject,我会得到错误如果我从eclipse在本地(笔记本电脑)上运行,它运行得很好,但是在集群(YARN)上运行时,它会给出奇怪的错误。Spark Version 2.0.0

代码:

JavaRDD<JSONObject> rdd7 = rdd6.flatMap(new FlatMapFunction<JSONArray, JSONObject>(){
    @Override
    public Iterable<JSONObject> call(JSONArray array) throws Exception {
        List<JSONObject> list = new ArrayList<JSONObject>();
        for (int i = 0; i < array.length();list.add(array.getJSONObject(i++)));             
        return list;
    }
});

错误日志:

java.lang.AbstractMethodError: com.pwc.spark.tifcretrolookup.TIFCRetroJob$2.call(Ljava/lang/Object;)Ljava/util/Iterator;
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:124)
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:124)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
    at com.pwc.spark.ElasticsearchClientLib.CommonESClient.index(CommonESClient.java:33)
    at com.pwc.spark.ElasticsearchClientLib.ESClient.call(ESClient.java:34)
    at com.pwc.spark.ElasticsearchClientLib.ESClient.call(ESClient.java:15)
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:218)
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:218)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:883)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:883)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

从Spark 2.0.0开始,flatMap调用内部的函数必须返回Iterator而不是Iterable,正如发布说明所述:

Java RDD的flatMap和mapPartitions函数用于要求函数返回Java Iterable。它们已被更新为要求函数返回Java迭代器,因此函数不需要物化所有数据。

这里是相关的Jira问题

最新更新