假设我有一个分布在3个节点上的分布式系统,我的数据分布在这些节点之间。例如,我有一个test.csv文件,它存在于所有3个节点上,它包含2列:
**row | id, c.**
---------------
row1 | k1 , c1
row2 | k1 , c2
row3 | k1 , c3
row4 | k2 , c4
row5 | k2 , c5
row6 | k2 , c6
row7 | k3 , c7
row8 | k3 , c8
row9 | k3 , c9
row10 | k4 , c10
row11 | k4 , c11
row12 | k4 , c12
然后我使用SparkContext。textFile读取文件为rdd,等等。据我所知,每个spark worker节点将从文件中读取a部分。现在我们设每个节点存储:
- 节点1:第1~第4行
- 节点2:第5~8行
- 节点3:第9~12行
我的问题是,假设我想对这些数据进行计算,并且有一步我需要将密钥分组在一起,因此密钥值对将是[k1 [{k1 c1} {k1 c2} {k1 c3}]]..
,等等。
有一个叫做groupByKey()
的函数,使用起来非常昂贵,建议使用aggregateByKey()
。所以我想知道groupByKey()
和aggregateByKey()
是如何在引擎盖下工作的?有人能用我上面提供的例子解释一下吗?洗牌后,行在每个节点上驻留在哪里?
aggregateByKey()与reduceByKey有很大的不同。reduceByKey是aggregateByKey的一种特殊情况
aggregateByKey()将组合特定键的值,这种组合的结果可以是您指定的任何对象。您必须指定如何在一个分区(在同一节点中执行)中组合("添加")这些值,以及如何组合来自不同分区(可能在不同节点中)的结果。reduceByKey是一种特殊的情况,在某种意义上,组合的结果(例如求和)与值具有相同的类型,并且来自不同分区的组合操作也与在分区内组合值的操作相同。
一个例子:假设你有一对的列表。你并行化它:
val pairs = sc.parallelize(Array(("a", 3), ("a", 1), ("b", 7), ("a", 5)))
现在您想要通过键"组合"它们生成一个和。在这种情况下,reduceByKey和aggregateByKey是相同的:
val resReduce = pairs.reduceByKey(_ + _) //the same operation for everything
resReduce.collect
res3: Array[(String, Int)] = Array((b,7), (a,9))
//0 is initial value, _+_ inside partition, _+_ between partitions
val resAgg = pairs.aggregateByKey(0)(_+_,_+_)
resAgg.collect
res4: Array[(String, Int)] = Array((b,7), (a,9))
现在,假设您希望聚合为值的集合,这是一个不同类型的值,是整数(整数的和也是整数):
import scala.collection.mutable.HashSet
//the initial value is a void Set. Adding an element to a set is the first
//_+_ Join two sets is the _++_
val sets = pairs.aggregateByKey(new HashSet[Int])(_+_, _++_)
sets.collect
res5: Array[(String, scala.collection.mutable.HashSet[Int])] =Array((b,Set(7)), (a,Set(1, 5, 3)))
aggregateByKey()
与reduceByKey()
几乎相同(两者都在幕后调用combineByKey()
),只是您为aggregateByKey()
提供了一个起始值。大多数人都熟悉reduceByKey()
,所以我将在解释中使用它。
reduceByKey()
更好的原因是它利用了MapReduce称为组合器的特性。像+
或*
这样的任何函数都可以以这种方式使用,因为调用它的元素的顺序无关紧要。这允许Spark开始"减少"具有相同键的值,即使它们不在同一个分区中。
另一方面,groupByKey()
为您提供了更多的多功能性,因为您编写了一个接受Iterable的函数,这意味着您甚至可以将所有元素拉入数组。然而,它是低效的,因为要使其工作,所有(K,V,)
对必须在一个分区中。
在reduce类型操作中移动数据的步骤通常称为shuffle,在最简单的级别上,数据被划分到每个节点(通常使用散列分区器),然后在每个节点上排序。