如何使用Java8流和lambdas进行并行的唯一单词计数



用Java 8流和lambdas进行并行唯一单词计数的最佳方法是什么?

我已经想出了几个,但我不相信它们是最优的。我知道Hadoop上的map reduce解决方案,不知道它们是否提供了相同的并行性。

// Map Reduce Word Count 
Map<String, Integer> wordCount = Stream.of("dog","cat","dog","dog","cow","house","house").parallel().collect( Collectors.groupingBy(e->e,Collectors.summingInt(e -> 1)));
System.out.println("number of dogs = " + wordCount.get("dog"));
Map<Object, Object> wordCount2 = Stream.of("dog","cat","dog","dog","cow","house","house").parallel().collect(Collectors.toConcurrentMap(keyWord->keyWord, keyWord->1, (oldVal,newVal)->(int)oldVal+(int)newVal));
System.out.println("number of dogs = " + wordCount2.get("dog"));

假设真正的列表会更长,可能来自文件或生成的流,并且我想知道所有单词的计数,而不仅仅是dog。

看看Collectors.groupingBy 的javadocs

@implNote返回的收集器不是并发的。对于平行流管道,合并器功能通过合并一个管道中的键来操作映射到另一个,这可能是一个昂贵的操作。如果保存元素呈现给下游的顺序收集器不是必需的,使用groupingByConcurrent(Function,供应商、收集器)可以提供更好的并行性能。

现在,看看Collectors.groupingByConcurrent,你会发现这或多或少相当于你的第二种方法

返回实现级联"分组依据"的并发收集器对类型T的输入元素的操作,根据分类函数,然后执行缩减操作使用指定的下游收集器。收集器生成的ConcurrentMap是使用提供的工厂功能创建。

groupingByConcurrenttoConcurrentMap相比,groupingBytoMap在大数据集上的工作速度可能较慢。检查groupingByConcurrent还是toConcurrentMap更快的最好方法是在自己的数据集上对它们进行基准测试。我认为结果会大致相同。

但是,请注意,如果使用该文件作为源文件,则并行性可能会降低速度,因为在Java 8中,Files.lines()BufferedReader.lines()按顺序读取文件,并行性是通过将行块预缓冲到数组中并生成新任务来实现的。这并不总是有效的,所以瓶颈可能在这个过程中。在JDK9中,Files.lines()经过了优化(对于长度小于2Gb的常规文件),因此您可能会在那里获得更好的性能。

至于生成的源,这取决于您如何生成它们。如果你为你的来源提供好的拆分策略会更好。如果使用Stream.iterateSpliterators.spliterator(iterator, ...)或扩展AbstractSpliterator类,则默认的拆分策略是相同的:将一些元素预缓冲到数组中以生成子任务。

解释Lee的代码:

public static Map<String, Integer> wordCount(Stream<String> stream) {
    return stream
       .flatMap(s -> Stream.of(s.split("\s+")))
       .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum)); 
}

s->s:密钥映射程序

s->1:值映射器

Integer::sum:合并函数

public static Map<String, Integer> wordCount(Stream<String> stream) {
    return stream
       .flatMap(s -> Stream.of(s.split("\s+")))
       .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum)); 
}

相关内容

  • 没有找到相关文章

最新更新