Spark2 数据帧/RDD 进程分组



我在Hive中存储了以下名为ExampleData的表:

+--------+-----+---|
|Site_ID |Time |Age|
+--------+-----+---|
|1       |10:00| 20|
|1       |11:00| 21|
|2       |10:00| 24|
|2       |11:00| 24|
|2       |12:00| 20|
|3       |11:00| 24|
+--------+-----+---+

我需要能够按站点处理数据。不幸的是,按站点对其进行分区不起作用(有超过 100k 个站点,所有站点的数据量都相当少)。

对于每个站点,我需要分别选择"时间"列和"年龄"列,并使用它来输入函数(理想情况下,我想在执行器上运行,而不是驱动程序)

我有一个我认为我希望它如何工作的存根,但此解决方案只能在驱动程序上运行,因此速度非常慢。我需要找到一种编写它的方法,以便它将运行执行器级别:

// fetch a list of distinct sites and return them to the driver 
//(if you don't, you won't be able to loop around them as they're not on the executors)
val distinctSites = spark.sql("SELECT site_id FROM ExampleData GROUP BY site_id LIMIT 10")
.collect
val allSiteData = spark.sql("SELECT site_id, time, age FROM ExampleData")
distinctSites.foreach(row => {
    allSiteData.filter("site_id = " + row.get(0))
    val times = allSiteData.select("time").collect()
    val ages = allSiteData.select("ages").collect()
    processTimesAndAges(times, ages)
})
def processTimesAndAges(times: Array[Row], ages: Array[Row]) {
    // do some processing
}

我尝试在所有节点上广播不同的站点,但这并没有证明是富有成效的。

这似乎是一个很简单的概念,但我花了几天时间研究这个问题。我对 Scala/Spark 很陌生,所以如果这是一个荒谬的问题,我们深表歉意!

任何建议或提示将不胜感激。

RDD API提供了许多函数,可用于从低级repartition/repartitionAndSortWithinPartitions开始,以许多*byKey方法(combineByKey,groupByKey,reduceByKey等)结束。

例:

rdd.map( tup => ((tup._1, tup._2, tup._3), tup) ).
  groupByKey().
  forEachPartition( iter => doSomeJob(iter) )

在DataFrame中可以使用聚合函数,GroupedData类为最常见的函数提供了许多方法,包括计数,最大值,最小值,平均值和总和

例:

   val df = sc.parallelize(Seq(
      (1, 10.3, 10), (1, 11.5, 10),
      (2, 12.6, 20), (3, 2.6, 30))
    ).toDF("Site_ID ", "Time ", "Age")
df.show()
+--------+-----+---+
|Site_ID |Time |Age|
+--------+-----+---+
|       1| 10.3| 10|
|       1| 11.5| 10|
|       2| 12.6| 20|
|       3|  2.6| 30|
+--------+-----+---+

    df.groupBy($"Site_ID ").count.show
+--------+-----+
|Site_ID |count|
+--------+-----+
|       1|    2|
|       3|    1|
|       2|    1|
+--------+-----+

注意:正如您提到的,解决方案非常慢,您需要使用分区,在您的情况下,分区范围是不错的选择。

  • http://dev.sortable.com/spark-repartition/
  • https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-rdd-partitions.html
  • http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-1/

相关内容

  • 没有找到相关文章

最新更新