在Spark中排序RDD



我有一个列出客户购买的一般项目的数据集。csv中的每条记录从左到右列出了客户购买的商品。例如(缩短的样本):

Bicycle, Helmet, Gloves
Shoes, Jumper, Gloves
Television, Hat, Jumper, Playstation 5

我希望把它放在scala的RDD中,并对它们执行计数。

case class SalesItemSummary(SalesItemDesc: String, SalesItemCount: String)
val rdd_1 = sc.textFile("Data/SalesItems.csv")
val rdd_2 = rdd_1.flatMap(line => line.split(",")).countByValue();
上面是一个简短的代码示例。第一行是case类(尚未使用)。第二行从csv中获取数据并将其放入rdd_1中。很容易。第三行执行flatmap,用逗号分隔数据,然后对每个数据进行计数。比如,"手套"one_answers";Jumper"上面会有数字2。其他的1。它看起来像一个元组的集合。到目前为止一切顺利。

接下来,我要对rdd_2进行排序,列出购买最多的前3个商品。我能用RDD做到这一点吗?或者我是否需要将RDD转移到数据框架中来实现排序?如果有,我该怎么做?

如何将第1行中的case类应用于rdd_2,这似乎是一个元组列表?我应该采用这种方法吗?

Thanks in advance

case类中的计数应该是一个整数…如果您想将结果保留为RDD,我建议使用reduceByKey而不是countByValue,它返回Map[String, Long]而不是RDD。

我还建议按,而不是,分开,以避免项目名称中的前导空格。

case class SalesItemSummary(SalesItemDesc: String, SalesItemCount: Int)
val rdd_1 = sc.textFile("Data/SalesItems.csv")
val rdd_2 = rdd_1.flatMap(_.split(", "))
.map((_, 1))
.reduceByKey(_ + _)
.map(line => SalesItemSummary(line._1, line._2))
rdd_2.collect()
// Array[SalesItemSummary] = Array(SalesItemSummary(Gloves,2), SalesItemSummary(Shoes,1), SalesItemSummary(Television,1), SalesItemSummary(Bicycle,1), SalesItemSummary(Helmet,1), SalesItemSummary(Hat,1), SalesItemSummary(Jumper,2), SalesItemSummary(Playstation 5,1))

对RDD进行排序,您可以使用sortBy:

val top3 = rdd_2.sortBy(_.SalesItemCount, false).take(3)
top3
// Array[SalesItemSummary] = Array(SalesItemSummary(Gloves,2), SalesItemSummary(Jumper,2), SalesItemSummary(Shoes,1))

相关内容

  • 没有找到相关文章

最新更新