为什么Spark/Scala编译器在RDD[Map[Int, Int]]上找不到toDF ?



为什么下面的代码会出现错误?

scala> import sqlContext.implicits._
import sqlContext.implicits._
scala> val rdd = sc.parallelize(1 to 10).map(x => (Map(x  -> 0), 0))
rdd: org.apache.spark.rdd.RDD[(scala.collection.immutable.Map[Int,Int], Int)] = MapPartitionsRDD[20] at map at <console>:27
scala> rdd.toDF
res8: org.apache.spark.sql.DataFrame = [_1: map<int,int>, _2: int]
scala> val rdd = sc.parallelize(1 to 10).map(x => Map(x  -> 0))
rdd: org.apache.spark.rdd.RDD[scala.collection.immutable.Map[Int,Int]] = MapPartitionsRDD[23] at map at <console>:27
scala> rdd.toDF
<console>:30: error: value toDF is not a member of org.apache.spark.rdd.RDD[scala.collection.immutable.Map[Int,Int]]
              rdd.toDF

那么这里到底发生了什么,toDF可以将类型为(scala.collection.immutable.Map[Int,Int], Int)的RDD转换为数据帧,但不能将类型为scala.collection.immutable.Map[Int,Int]的RDD转换为数据帧。为什么呢?

与不能使用

的原因相同
sqlContext.createDataFrame(1 to 10).map(x => Map(x  -> 0))

如果您查看org.apache.spark.sql.SQLContext源,您将发现createDataFrame方法的两种不同实现:

def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame  

def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame 

正如你所看到的,两者都要求AProduct的子类。当您在RDD[(Map[Int,Int], Int)]上调用toDF时,它可以工作,因为Tuple2确实是Product。因此,Map[Int,Int]本身不是错误。

你可以用Tuple1:

Map包裹起来
sc.parallelize(1 to 10).map(x => Tuple1(Map(x  -> 0))).toDF

基本上是因为没有隐式地在RDD中为Map创建DataFrame

在你的第一个例子中,你返回一个Tuple,它是一个Product,它有一个隐式转换。

rddToDataFrameHolder[A <: Product: TypeTag](rdd: rdd [A])

在第二个示例中,您在RDD中使用Map,其中没有隐式转换。

相关内容

  • 没有找到相关文章

最新更新