我在驱动程序中有这个函数,它将rdds的结果收集到一个数组中并发送回来。然而,即使RDD(在数据流中)有数据,函数也会返回一个空数组。。。我做错了什么?
def runTopFunction() : Array[(String, Int)] = {
val topSearches = some function....
val summary = new ArrayBuffer[(String,Int)]()
topSearches.foreachRDD(rdd => {
summary = summary.++(rdd.collect())
})
return summary.toArray
}
因此,虽然foreachRDD
将执行您想要执行的操作,但它也是非阻塞的,这意味着它不会等到所有流都得到处理。由于您在调用foreachRDD
之后立即在缓冲区中调用toArray
,因此还没有处理任何元素。
DStream.forEachRDD
是对给定DStream
的一个操作,将计划在每个流处理批处理间隔执行。这是稍后要执行的作业的声明性构造
不支持以这种方式对值进行累加,因为当Dstream.forEachRDD只是说"在每次迭代中都这样做"时,周围的累加代码会立即执行,从而产生一个空数组。
根据summary
数据计算后发生的情况,关于如何实现这一点,有几个选项:
- 如果数据需要由另一个进程检索,请使用共享线程安全结构。优先级队列非常适合前k个用途
- 如果要存储数据(fs,db),您只需在将
topSearches
函数应用于数据流后写入存储器即可