FLINK:DATASET.COUNT()是瓶颈 - 如何平行计数



我正在使用flink学习地图 - 还需要疑问如何有效地计算数据集中的元素。到目前为止,我拥有的是:

DataSet<MyClass> ds = ...;
long num = ds.count();

执行此操作时,在我的flink日志中说

12/03/2016 19:47:27 Datasink(count())(1/1)切换到运行

因此,只使用了一个CPU(我有四个和其他命令,例如减少使用所有命令)。

我认为count()内部从所有四个CPU中收集数据集,并依次对其进行计数,而不是让每个CPU计数其零件,然后将其总结。是真的吗?

如果是,我该如何利用我的所有CPU?首先将我的数据集映射到包含原始值作为第一个项目的2块和长度值1作为第二项,然后使用总和函数进行汇总?

例如,数据集将映射到数据集>长度为1。

计算数据集中项目的最佳实践是什么?

问候西蒙

DataSet#count()是一个非并行操作,因此只能使用一个线程。

您将逐键进行并行化,并对您的密钥计数应用最终款项,以达到整体计数以加快计算的速度。

这是一个很好的解决方案吗?

DataSet<Tuple1<Long>> x = ds.map(new MapFunction<MyClass, Tuple1<Long>>() { 
    @Override public Tuple1<Long> map(MyClass t) throws Exception { 
        return new Tuple1<Long>(1L); 
    } 
}).groupBy(0).sum(0);
Long c = x.collect().iterator().next().f0;

相关内容

  • 没有找到相关文章

最新更新