有人使用spark 1.0.0获得DStream.reduce工作吗?
我有一些代码,看起来非常合理。
val word1 = messages.map {
word =>
val key = word
(key, 1)
}
val wordcount = word1.reduce(reduceCount)
private def reduceCount(count1: Int, count2: Int) : Int = {
count1 + count2
}
reduce语句得到一个编译错误:类型不匹配;found:需要整数:(String,Int)
为什么会有这样的抱怨?reduceCount应该只对int计数进行操作,reduce应该返回与word1相同的类型,即(String,int)。我尝试了很多变化来绕过这个错误,但它似乎只是表现不正确。
如果改为调用reduceByKeyAndWindow,则不存在编译错误。
val wordcount = word1.reduceByKeyAndWindow(reduceCount, batchDuration)
操作DStream.reduce
具有以下签名:
def reduce(reduceFunc: (T, T) => T): DStream[T]
从语义上讲,它采用流的两个元素的关联函数,并生成一个元素。
给定messagesDstream
是字符串流,在这样映射后:
val word1 = messagesDstream.map {word => (word,1)}
CCD_ 3的类型是Tuple2[String,Int]。这意味着reduce
应该采用具有签名的reduce函数:f(x:(String,Int), y:(String,Int)): (String, Int)
。在针对该问题提供的代码中,reduce函数为f(x:Int, y:Int):Int
。
在这种情况下,您想要使用的操作是Dstream.reduceByKey(_ + _)
,因为它将在按键对值分组后应用reduce函数,这就是单词计数的意义。