我有一个10个字段的数据集。我需要在这些数据框架上执行RDD操作。是否可以执行RDD操作,例如map
,flatMap
等。
这是我的示例代码:
df.select("COUNTY","VEHICLES").show();
这是我的dataframe
,我需要将此dataframe
转换为RDD
并在此新的RDD上操作一些RDD操作。
这是我如何将数据框架转换为rdd
的代码 RDD<Row> java = df.select("COUNTY","VEHICLES").rdd();
转换为RDD后,我看不到RDD结果,我尝试了
java.collect();
java.take(10);
java.foreach();
在上述所有情况下,我都无法获得结果。
请帮助我。
val myRdd : RDD[String] = ds.rdd
将SPARK API文档数据集检查到RDD。lazy val
rdd: RDD[T]
在您的情况下,通过执行选择后,请选择.rdd
,以创建数据框
以来Spark 2.0您可以使用toDS
函数将数据框架转换为数据集,以便使用RDD操作。
推荐有关掌握Spark 2.0
for Spark 1.6:
您将无法看到结果,就像将Dataframe
转换为RDD时,它的作用是将其转换为RDD[Row]
因此,当您尝试其中任何一种:
java.collect();
java.take(10);
java.foreach();
它将导致Array[Row]
,您无法获得结果。
解决方案:
您可以将行转换为相应的值,并像这里一样从中获取RDD
:
val newDF=df.select("COUNTY","VEHICLES")
val resultantRDD=newDF.rdd.map{row=>
val county=row.getAs[String]("COUNTY")
val vehicles=row.getAs[String]("VEHICLES")
(county,vehicles)
}
现在您可以应用foreach
和collect
功能以获取值。
P.S。:代码是用Scala编写的,但是您可以获得我想做的事情的本质!
在阅读RDD的数据之前,请尝试将RDD持续存在。
val finalRdd = mbnfinal.rdd
finalRdd.cache()
finalRdd.count()