从数据流驱动程序中的RDD收集结果



我在驱动程序中有这个函数,它将rdds的结果收集到一个数组中并发送回来。然而,即使RDD(在数据流中)有数据,函数也会返回一个空数组。。。我做错了什么?

def runTopFunction() : Array[(String, Int)] = {
        val topSearches = some function....
        val summary = new ArrayBuffer[(String,Int)]()
        topSearches.foreachRDD(rdd => {
            summary = summary.++(rdd.collect())
        })    
    return summary.toArray
}

因此,虽然foreachRDD将执行您想要执行的操作,但它也是非阻塞的,这意味着它不会等到所有流都得到处理。由于您在调用foreachRDD之后立即在缓冲区中调用toArray,因此还没有处理任何元素。

DStream.forEachRDD是对给定DStream的一个操作,将计划在每个流处理批处理间隔执行。这是稍后要执行的作业的声明性构造

不支持以这种方式对值进行累加,因为当Dstream.forEachRDD只是说"在每次迭代中都这样做"时,周围的累加代码会立即执行,从而产生一个空数组。

根据summary数据计算后发生的情况,关于如何实现这一点,有几个选项:

  • 如果数据需要由另一个进程检索,请使用共享线程安全结构。优先级队列非常适合前k个用途
  • 如果要存储数据(fs,db),您只需在将topSearches函数应用于数据流后写入存储器即可

最新更新