我能够计算火花收集的每个起始字母的平均单词长度
val animals23 = sc.parallelize(List(("a","ant"), ("c","crocodile"), ("c","cheetah"), ("c","cat"), ("d","dolphin"), ("d","dog"), ("g","gnu"), ("l","leopard"), ("l","lion"), ("s","spider"), ("t","tiger"), ("w","whale")), 2)
使用
animals23.
aggregateByKey((0,0))(
(x, y) => (x._1 + y.length, x._2 + 1),
(x, y) => (x._1 + y._1, x._2 + y._2)
).
map(x => (x._1, x._2._1.toDouble / x._2._2.toDouble)).
collect
或使用
animals23.
combineByKey(
(x:String) => (x.length,1),
(x:(Int, Int), y:String) => (x._1 + y.length, x._2 + 1),
(x:(Int, Int), y:(Int, Int)) => (x._1 + y._1, x._2 + y._2)
).
map(x => (x._1, x._2._1.toDouble / x._2._2.toDouble)).
collect
每个都导致
Array((a,3.0), (c,6.333333333333333), (d,5.0), (g,3.0), (l,5.5), (w,5.0), (s,6.0), (t,5.0))
我不明白的是:为什么要求我在第二个例子中明确说明函数中的类型,而第一个例子的函数可以不这样做?
我说的是
(x, y) => (x._1 + y.length, x._2 + 1),
(x, y) => (x._1 + y._1, x._2 + y._2)
与
(x:(Int, Int), y:String) => (x._1 + y.length, x._2 + 1),
(x:(Int, Int), y:(Int, Int)) => (x._1 + y._1, x._2 + y._2)
这可能更像是一个Scala问题,而不是一个Spark问题。
为什么要求我明确说明中函数中的类型第二个例子,而第一个例子的功能可以没有?
因为在第一个例子中,编译器能够根据提供的第一个参数列表推断seqOp
的类型。aggregateByKey
正在使用currying:
def aggregateByKey[U](zeroValue: U)
(seqOp: (U, V) ⇒ U,
combOp: (U, U) ⇒ U)
(implicit arg0: ClassTag[U]): RDD[(K, U)]
类型推断在Scala中的工作方式是,编译器能够根据第一个参数列表推断第二个参数列表的类型。所以在第一个例子中,它知道seqOp
是函数((Int, Int), String) => (Int, Int)
,combOp
也是如此。
相反,combineByKey
只有一个参数列表:
combineByKey[C](createCombiner: (V) ⇒ C,
mergeValue: (C, V) ⇒ C,
mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]
在没有明确说明类型的情况下,编译器不知道将x
和y
推断为什么。
要帮助编译器,可以显式指定类型参数:
animals23
.combineByKey[(Int, Int)](x => (x.length,1),
(x, y) => (x._1 + y.length, x._2 + 1),
(x, y) => (x._1 + y._1, x._2 + y._2))
.map(x => (x._1, x._2._1.toDouble / x._2._2.toDouble))
.collect