Spark:将 rdd[row] 转换为数据帧,其中行中的一列是列表



我有一个rdd[行],每行都有以下数据

[guid, List(peopleObjects)]  
["123", List(peopleObjects1, peopleObjects2, peopleObjects3)]

我想将其转换为数据帧
我正在使用以下代码

val personStructureType = new StructType()
    .add(StructField("guid", StringType, true))
    .add(StructField("personList", StringType, true))  
val personDF = hiveContext.createDataFrame(personRDD, personStructureType)

我应该为我的架构使用不同的数据类型而不是字符串类型吗?

如果我的列表只是一个字符串,它可以工作,但是当它是一个列表时,我会收到以下错误

scala.MatchError: List(personObject1, personObject2, personObject3) (of class scala.collection.immutable.$colon$colon)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:295)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:294)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:401)
    at org.apache.spark.sql.SQLContext$$anonfun$7.apply(SQLContext.scala:445)
    at org.apache.spark.sql.SQLContext$$anonfun$7.apply(SQLContext.scala:445)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:219)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745) 

目前还不完全清楚您要做什么,但是执行您要执行的操作的更好方法是创建一个case class,然后将RDD线映射到case class,然后调用toDF

像这样:

case class MyClass(guid: Int, peopleObjects: List[String])
val rdd = sc.parallelize(Array((123,List("a","b")),(1232,List("b","d"))))
val df =  rdd.map(r => MyClass(r._1, r._2)).toDF
df.show
+----+-------------+
|guid|peopleObjects|
+----+-------------+
| 123|       [a, b]|
|1232|       [b, d]|
+----+-------------+

或者你可以用长手的方式做到这一点,但不使用 case 类,如下所示:

val df = sqlContext.createDataFrame(
  rdd.map(r => Row(r._1, r._2)),
  StructType(Array(
    StructField("guid",IntegerType),
    StructField("peopleObjects", ArrayType(StringType))
  ))
)

相关内容

  • 没有找到相关文章

最新更新