我创建了一个JSONArray并为此创建了RDD。当我尝试映射sqlContext.jsonRDD(rdd)时,我收到以下错误:
Error: application failed with exception
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 5, esu3v148.federated.fds): java.lang.ClassCastException: org.json.simple.JSONObject cannot be cast to java.lang.String
at org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$2.apply(JsonRDD.scala:307)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.rdd.RDD$$anonfun$19.apply(RDD.scala:885)
at org.apache.spark.rdd.RDD$$anonfun$19.apply(RDD.scala:884)
at org.apache.spark.SparkContext$$anonfun$32.apply(SparkContext.scala:1534)
at org.apache.spark.SparkContext$$anonfun$32.apply(SparkContext.scala:1534)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
创建了 JsonArray 并在 Spark 中使用,详情如下:
JSONArray jsonResultArray = new JSONArray();
SparkConf sparkConf = new SparkConf().setAppName("HBaseTest");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(60));
SQLContext sqlContext = new SQLContext(sc);
if (!jsonResultArray.isEmpty()) {
@SuppressWarnings("unchecked")
//JavaRDD<String> rdd = sc.parallelize(jsonResultArray);
DataFrame input = sqlContext.jsonRDD(sc.parallelize(jsonResultArray));
请帮助我,如何解决这个问题谢谢。
sqlContext.jsonRDD
需要一个类型为 JavaRDD<java.lang.String>
的参数。
JSONArray 是 org.json.simple.JSONObject
的列表,因此sc.parallelize(jsonResultArray)
创建一个JavaRDD<JSONObject>
- 因此当将其传递给 jsonRDD
时会抛出异常。这通常是编译时错误,但编译器被org.json.simple.JSONArray
扩展泛型List
(没有显式类型)的事实所误导,因此仅在运行时检测到这种不匹配。
如果你真的必须使用JSONArray,你必须在创建RDD之前或之后将其映射到字符串中,例如:
final JavaRDD<JSONObject> jsonObjectRDD = sc.parallelize((List<JSONObject>) jsonResultArray);
final JavaRDD<String> jsonStringRDD = jsonObjectRDD.map(new Function<JSONObject, String>() {
@Override
public String call(JSONObject v) throws Exception {
return v.toJSONString();
}
});
DataFrame input = sqlContext.jsonRDD(jsonStringRDD);