当没有新的数据以供当前的微型捕获新的数据时,从以前的Microbatch中启动蒸汽蒸汽的updateStateByKey



我不清楚如果我想在所有微观匹配的所有微观匹配上显示一些简单计数的键,无论在不结合时是否可以使用标准的火花流或以适当的方式加入!考虑到当前处理可能没有排名前5位的过程的保存/持久数据集,该数据可能没有。

即。如果初始微键当前处理的前5个是x,y,z,a,b,那么,如果仅作为数据,下一个microbatch具有x,c,m,我可以检索A,B和Y和Z作为前5名的一部分,如果C和M的发生率少于前五名?

可能是不良用例。

rdd1 = sc.parallelize(list('abcd')).map(lambda x: (x, 110 - ord(x)))
rdd2 = sc.parallelize(list('cdef')).map(lambda x: (x, 2))
rddQueue = ssc.queueStream([rdd1, rdd2])

def func(new_values, old_value):
    return sum(new_values) + (old_value or 0)

rddQueue = rddQueue.updateStateByKey(func).transform(lambda x: x.sortBy(lambda y: y[1], ascending=False))
rddQueue.pprint()

输出:

-------------------------------------------                                     
Time: 2016-12-16 11:06:54
-------------------------------------------
('a', 13)
('b', 12)
('c', 11)
('d', 10)
-------------------------------------------                                     
Time: 2016-12-16 11:06:57
-------------------------------------------
('a', 13)
('c', 13)
('b', 12)
('d', 12)
('f', 2)
('e', 2)

'检索'意味着什么?

最新更新