我正在使用flink学习地图 - 还需要疑问如何有效地计算数据集中的元素。到目前为止,我拥有的是:
DataSet<MyClass> ds = ...;
long num = ds.count();
执行此操作时,在我的flink日志中说
12/03/2016 19:47:27 Datasink(count())(1/1)切换到运行
因此,只使用了一个CPU(我有四个和其他命令,例如减少使用所有命令)。
我认为count()内部从所有四个CPU中收集数据集,并依次对其进行计数,而不是让每个CPU计数其零件,然后将其总结。是真的吗?
如果是,我该如何利用我的所有CPU?首先将我的数据集映射到包含原始值作为第一个项目的2块和长度值1作为第二项,然后使用总和函数进行汇总?
例如,数据集将映射到数据集>长度为1。
计算数据集中项目的最佳实践是什么?
问候西蒙
DataSet#count()
是一个非并行操作,因此只能使用一个线程。
您将逐键进行并行化,并对您的密钥计数应用最终款项,以达到整体计数以加快计算的速度。
这是一个很好的解决方案吗?
DataSet<Tuple1<Long>> x = ds.map(new MapFunction<MyClass, Tuple1<Long>>() {
@Override public Tuple1<Long> map(MyClass t) throws Exception {
return new Tuple1<Long>(1L);
}
}).groupBy(0).sum(0);
Long c = x.collect().iterator().next().f0;