我正在尝试使用Python API在Flink上实现Kmeans叮咬算法。我正在基于0个索引进行key_by
,然后尝试在每个组上进行reduce()
以获取一种计数汇总。
class CentroidAccumulator(ReduceFunction):
def reduce(self, val1, val2):
id1, point1, count1 = val1
id2, point2, count2 = val2
return (id1, point1.add(point2), count1+count2)
class Selector(KeySelector):
def getKey(self, value):
return value[0]
nearest_points = points
.map(SelectNearestPoint(centroids))
.key_by(Selector()).reduce(CentroidAccumulator())
nearest_points.write_as_text("output.txt")
预期结果:
(1, <tuple>, count)
(2, <tuple>, count)
(3, <tuple>, count)
(4, <tuple>, count)
实际结果:
我获取所有写入文件的所有迭代的输出(我正在测试的样本中有40个点,因此输出具有40行(
((1, <kmeans_clustering.Point instance at 0x2>, 1)
(3, <kmeans_clustering.Point instance at 0x3>, 1)
(2, <kmeans_clustering.Point instance at 0x4>, 1)
(2, <kmeans_clustering.Point instance at 0x5>, 2)
.
.
.
(2, <kmeans_clustering.Point instance at 0x20>, 13)
(2, <kmeans_clustering.Point instance at 0x21>, 14)
(1, <kmeans_clustering.Point instance at 0x22>, 10)
(4, <kmeans_clustering.Point instance at 0x23>, 4)
(2, <kmeans_clustering.Point instance at 0x24>, 15)
(2, <kmeans_clustering.Point instance at 0x25>, 16)
(1, <kmeans_clustering.Point instance at 0x26>, 11)
(4, <kmeans_clustering.Point instance at 0x27>, 5)
(2, <kmeans_clustering.Point instance at 0x28>, 17)
(2, <kmeans_clustering.Point instance at 0x29>, 18)
问题是它正在减少好的,但是我只想获得每个组减少转换的最后值(这就是减少应该对我的理解作用(。我在做什么错?
您没有做错任何事情;这是流媒体降低功能的预期行为。从概念上讲,数据流是一个无尽的数据流 - 因此,"等到结束"以产生结果是没有意义的。流媒体程序的标准行为是为每个事件产生结果。
当然,这可能有点不方便。如果您只想看到最终结果,则必须有某种方法表明结局已经到来。随着批处理程序,这很自然。使用流应用程序,有限数据源发送带有值max_watermark的水印,可用于检测输入已达到其末尾 - 您可以使用活动时间计时器的过程函数捕获此功能,但这是一个有些复杂的解决方案。您也可以使用Windows来实现一种解决方法。