可以实时更新带有火花流的值



假设我有一个双重值流,我想每十秒钟计算平均值。我如何才能有一个不需要重新计算平均值的滑动窗口,而是通过删除最古老的十秒钟的一部分并仅添加新的10秒值来对其进行更新?

tl; dr:在其两个函数参数中使用 reduceByWindow(跳到代码shippet的最后一段)

有两个解释您的问题,即特定的问题(我如何获得一个小时的跑步,每2秒更新一次)和一般的解释(我如何获得以稀疏方式更新状态的计算)。这是一般一个的答案。

首先,请注意,可以根据窗口的Dstream易于计算您的数据,以表明您的平均值更高,这将您的数据表示为流的增量构造,并具有最大共享。但是,在计算上,重新计算每批平均值的效率较低 - 如您所指出的。

如果您确实想更新复杂的状态计算,该计算是可逆的,但不想触摸流的构造,则有updateStateByKey - 但是Spark无助于您反映您的增量方面流中的计算,您必须自己管理。

在这里,您确实有一些简单且可逆的东西,并且没有钥匙的概念。您可以使用reduceByWindow的反向减少参数,使用通常的函数,该功能使您可以计算增量均值。

val myInitialDStream: DStream[Float]
val myDStreamWithCount: DStream[(Float, Long)] = 
  myInitialDStream.map((x) => (x, 1L))
def addOneBatchToMean(previousMean: (Float, Long), newBatch: (Float, Long)): (Float, Long) = 
  (previousMean._1 + newBatch._1, previousMean._2 + newBatch._2)
def removeOneBatchToMean(previousMean: (Float, Long), oldBatch: (Float, Long)): (Float, Long) = 
  (previousMean._1 - oldBatch._1, previousMean._2 - oldBatch._2)
val runningMeans = myDStreamWithCount.reduceByWindow(addOneBatchToMean, removeOneBatchToMean, Durations.seconds(3600), Duractions.seconds(2))

您获得了一元素RDD s的流,每个元素包含一对(m,n),其中m是1H窗口上的运行总和,n n,n是1H窗口中的元素数量。只需返回(或map到)m/n即可获得平均值。

最新更新