如何从火花壳中的数组[数组[字符串]]的RDD中提取字符串值?



>我有一个数组如下:

Array[Array[String]] = Array(Array(1,1,1,300,item1), Array(2,1,2,300,item2), Array(3,1,2,300,item3), Array(4,2,3,100,item4), Array(5,1,3,300,item5))

我想提取((1,1)(1,2)(1,2)(2,3)(1,3)),即每个数组(数组(第 2 和第 3 个元素。 当我在火花壳中的RDD上进行此转换时:

val arr = flat.map(array => (array(0), array(1)))
arr.collect

然后我得到的错误如下:

[Stage 6:>                                                          (0 + 0) / 2]20/02/12 02:03:01 ERROR Executor: Exception in task 1.0 in stage 6.0 (TID 13)
java.lang.ArrayIndexOutOfBoundsException: 1
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:31)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:31)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912)
at org.apache.spark.SparkCont

编辑1:我使用第一个答案后的完整代码: 我仍然无法从数组中提取两行(数组(((

scala> flat.collect
res3: Array[Array[String]] = Array(Array(1,1,1,300,item1), Array(2,1,2,300,item2), Array(3,1,2,300,item3), Array(4,2,3,100,item4), Array(5,1,3,300,item5))
scala> val parl = sc.parallelize(flat)
<console>:31: error: type mismatch;
found   : org.apache.spark.rdd.RDD[Array[String]]
required: Seq[?]
Error occurred in an application involving default arguments.
val parl = sc.parallelize(flat)
scala> val parl = sc.parallelize(flat.collect)
parl: org.apache.spark.rdd.RDD[Array[String]] = ParallelCollectionRDD[6] at parallelize at <console>:31
scala> parl.collect
res4: Array[Array[String]] = Array(Array(1,1,1,300,item1), Array(2,1,2,300,item2), Array(3,1,2,300,item3), Array(4,2,3,100,item4), Array(5,1,3,300,item5))
scala> val gvk = parl.map(array=>(array(0),array(1)))
gvk: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[7] at map at <console>:33
scala> gvk.collect
20/02/12 03:27:29 ERROR Executor: Exception in task 0.0 in stage 6.0 (TID 13)
java.lang.ArrayIndexOutOfBoundsException: 1
at $line29.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:33)
at $line29.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:33)

试试这个:

scala> val rdd=sc.parallelize(Array(Array("1","1","1","300","item1"), Array("2","1","2","300","item2"), Array("3","1","2","300","item3"), Array("4","2","3","300","item4"), Array("5","1","3","300","item5")))
rdd: org.apache.spark.rdd.RDD[Array[String]] = ParallelCollectionRDD[4] at parallelize at <console>:24
scala> rdd.map(array=>(array(0),array(1))).collect
res2: Array[(String, String)] = Array((1,1), (2,1), (3,1), (4,2), (5,1))

最新更新