用Java 8流和lambdas进行并行唯一单词计数的最佳方法是什么?
我已经想出了几个,但我不相信它们是最优的。我知道Hadoop上的map reduce解决方案,不知道它们是否提供了相同的并行性。
// Map Reduce Word Count
Map<String, Integer> wordCount = Stream.of("dog","cat","dog","dog","cow","house","house").parallel().collect( Collectors.groupingBy(e->e,Collectors.summingInt(e -> 1)));
System.out.println("number of dogs = " + wordCount.get("dog"));
Map<Object, Object> wordCount2 = Stream.of("dog","cat","dog","dog","cow","house","house").parallel().collect(Collectors.toConcurrentMap(keyWord->keyWord, keyWord->1, (oldVal,newVal)->(int)oldVal+(int)newVal));
System.out.println("number of dogs = " + wordCount2.get("dog"));
假设真正的列表会更长,可能来自文件或生成的流,并且我想知道所有单词的计数,而不仅仅是dog。
看看Collectors.groupingBy
的javadocs
@implNote返回的收集器不是并发的。对于平行流管道,合并器功能通过合并一个管道中的键来操作映射到另一个,这可能是一个昂贵的操作。如果保存元素呈现给下游的顺序收集器不是必需的,使用groupingByConcurrent(Function,供应商、收集器)可以提供更好的并行性能。
现在,看看Collectors.groupingByConcurrent
,你会发现这或多或少相当于你的第二种方法
返回实现级联"分组依据"的并发收集器对类型T的输入元素的操作,根据分类函数,然后执行缩减操作使用指定的下游收集器。收集器生成的ConcurrentMap是使用提供的工厂功能创建。
与groupingByConcurrent
和toConcurrentMap
相比,groupingBy
和toMap
在大数据集上的工作速度可能较慢。检查groupingByConcurrent
还是toConcurrentMap
更快的最好方法是在自己的数据集上对它们进行基准测试。我认为结果会大致相同。
但是,请注意,如果使用该文件作为源文件,则并行性可能会降低速度,因为在Java 8中,Files.lines()
和BufferedReader.lines()
按顺序读取文件,并行性是通过将行块预缓冲到数组中并生成新任务来实现的。这并不总是有效的,所以瓶颈可能在这个过程中。在JDK9中,Files.lines()
经过了优化(对于长度小于2Gb的常规文件),因此您可能会在那里获得更好的性能。
至于生成的源,这取决于您如何生成它们。如果你为你的来源提供好的拆分策略会更好。如果使用Stream.iterate
或Spliterators.spliterator(iterator, ...)
或扩展AbstractSpliterator
类,则默认的拆分策略是相同的:将一些元素预缓冲到数组中以生成子任务。
解释Lee的代码:
public static Map<String, Integer> wordCount(Stream<String> stream) {
return stream
.flatMap(s -> Stream.of(s.split("\s+")))
.collect(Collectors.toMap(s -> s, s -> 1, Integer::sum));
}
s->s:密钥映射程序
s->1:值映射器
Integer::sum:合并函数
public static Map<String, Integer> wordCount(Stream<String> stream) {
return stream
.flatMap(s -> Stream.of(s.split("\s+")))
.collect(Collectors.toMap(s -> s, s -> 1, Integer::sum));
}