spark schema rdd to RDD



我想在spark中做单词计数,我使用spark sql创建了一个rdd来从数据集中提取不同的tweet。我想在RDD上使用拆分函数,但它不允许我这样做。

错误:-value split不是org.apache.spark.sql.SchemaRdd的成员

Spark代码不工作做字数统计:-

val disitnct_tweets=hiveCtx.sql("select distinct(text) from tweets_table where text <> ''")
val distinct_tweets_List=sc.parallelize(List(distinct_tweets))
//tried split on both the rdd disnt worked
distinct_tweets.flatmap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)
distinct_tweets_List.flatmap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)

但是当我从sparksql输出数据到一个文件并再次加载它并运行split时,它工作了。

工作示例代码:-

val distinct_tweets=hiveCtx.sql("select dsitinct(text) from tweets_table where text <> ''")
val distinct_tweets_op=distinct_tweets.collect()
val rdd=sc.parallelize(distinct_tweets_op)
rdd.saveAsTextFile("/home/cloudera/bdp/op")
val textFile=sc.textFile("/home/cloudera/bdp/op/part-00000")
val counts=textFile.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)
counts.SaveAsTextFile("/home/cloudera/bdp/wordcount")

我需要一个答案,而不是写入文件并再次加载我的分割函数是否有一种工作可以使分割函数工作

谢谢

首先,我们不应该先做collect()然后并行化来创建RDD;这会让司机很忙的。

,

val distinct_tweets=hiveCtx.sql("select dsitinct(text) from tweets_table where text <> ''")
val distinct_tweets_op=distinct_tweets.map(x => x.mkstring)    

[考虑到这一点,您只在查询中选择单列- distinct(text)]

现在distinct_tweets_op只是一个RDD。

循环遍历这个RDD;并且您可以在该RDD中的每个字符串上应用split(")函数。

找到了答案,将数据帧或spark.sql.row.RDD转换为普通RDD的过程分为三步。

sc.parallelize(列表())映射到字符串

val distinct_tweets=hiveCtx.sql(" select distinct(text) from tweets_table where text <> ''")
val distinct_tweets_op=distinct_tweets.collect()
val distinct_tweets_list=sc.parallelize(List(distinct_tweets_op))
val distinct_tweets_string=distinct_tweets.map(x=>x.toString)
val test_kali=distinct_tweets_string.flatMap(line =>line.split(" ")).map(word => (word,1)).reduceByKey(_+_).sortBy {case (key,value) => -value}.map { case (key,value) => Array(key,value).mkString(",") }
test_kali.collect().foreach(println)
case class kali_test(text: String)
val test_kali_op=test_kali.map(_.split(" ")).map(p => kali_test(p(0)))
test_kali_op.registerTempTable("kali_test")
hiveCtx.sql(" select * from kali_test limit 10 ").collect().foreach(println)

这样我就不需要加载文件了,我可以动态地进行操作。

谢谢斯里兰卡

第一次失败的主要原因是这一行:

val distinct_tweets_List=sc.parallelize(List(distinct_tweets))

这在Spark中是完全无用的一行,而且比无用更糟糕——正如你看到的那样,它耗尽了你的系统。

您希望避免执行collect(),这会创建Array并将其返回给Driver应用程序。相反,您希望尽可能长时间地将对象保留为rdd,并向驱动程序返回尽可能少的数据(如键和减少后的计数)。

但是要回答您的基本问题,下面将采用由单个StringType列组成的DataFrame并将其转换为RDD[String]:

val myRdd = myDf.rdd.map(_.getString(0))

虽然SchemaRDD已经不存在了,但我相信下面的代码将把SchemaRDD与单个String列转换为普通的RDD[String]:

val myRdd = mySchemaRdd.map(_.getString(0))

最新更新