Flink流python api -redion()产生增量结果,而不是最终值



我正在尝试使用Python API在Flink上实现Kmeans叮咬算法。我正在基于0个索引进行key_by,然后尝试在每个组上进行reduce()以获取一种计数汇总。

class CentroidAccumulator(ReduceFunction):                                                                                                                                       
    def reduce(self, val1, val2):                                                                                                                                                
        id1, point1, count1 =  val1                                                                                                                                              
        id2, point2, count2 =  val2                                                                                                                                              
        return (id1, point1.add(point2), count1+count2)   
class Selector(KeySelector):                                                                                                                                                     
    def getKey(self, value):                                                                                                                                                     
        return value[0]   

nearest_points = points                                                                                                                                                 
                .map(SelectNearestPoint(centroids))                                                                                                                             
                .key_by(Selector()).reduce(CentroidAccumulator()) 
nearest_points.write_as_text("output.txt")

预期结果:

(1, <tuple>, count)
(2, <tuple>, count)
(3, <tuple>, count)
(4, <tuple>, count)

实际结果:

我获取所有写入文件的所有迭代的输出(我正在测试的样本中有40个点,因此输出具有40行(

(
(1, <kmeans_clustering.Point instance at 0x2>, 1)                                                                                                                                
(3, <kmeans_clustering.Point instance at 0x3>, 1)                                                                                                                                
(2, <kmeans_clustering.Point instance at 0x4>, 1)                                                                                                                                
(2, <kmeans_clustering.Point instance at 0x5>, 2)                                                                                                                                
.
.
.                                                                                                                
(2, <kmeans_clustering.Point instance at 0x20>, 13)                                                                                                                              
(2, <kmeans_clustering.Point instance at 0x21>, 14)                                                                                                                              
(1, <kmeans_clustering.Point instance at 0x22>, 10)                                                                                                                              
(4, <kmeans_clustering.Point instance at 0x23>, 4)                                                                                                                               
(2, <kmeans_clustering.Point instance at 0x24>, 15)                                                                                                                              
(2, <kmeans_clustering.Point instance at 0x25>, 16)                                                                                                                              
(1, <kmeans_clustering.Point instance at 0x26>, 11)                                                                                                                              
(4, <kmeans_clustering.Point instance at 0x27>, 5)                                                                                                                               
(2, <kmeans_clustering.Point instance at 0x28>, 17)                                                                                                                              
(2, <kmeans_clustering.Point instance at 0x29>, 18) 

问题是它正在减少好的,但是我只想获得每个组减少转换的最后值(这就是减少应该对我的理解作用(。我在做什么错?

您没有做错任何事情;这是流媒体降低功能的预期行为。从概念上讲,数据流是一个无尽的数据流 - 因此,"等到结束"以产生结果是没有意义的。流媒体程序的标准行为是为每个事件产生结果。

当然,这可能有点不方便。如果您只想看到最终结果,则必须有某种方法表明结局已经到来。随着批处理程序,这很自然。使用流应用程序,有限数据源发送带有值max_watermark的水印,可用于检测输入已达到其末尾 - 您可以使用活动时间计时器的过程函数捕获此功能,但这是一个有些复杂的解决方案。您也可以使用Windows来实现一种解决方法。

相关内容

  • 没有找到相关文章

最新更新