使用收集时的并发数据结构和并行流



在经历了以下答案之后,这些答案讨论了在流中使用并发数据结构以及使用并发映射和转换为映射之间的区别,有人可以解释一下如果我使用collect的其他语法会发生什么,即

    Stream<Integer> integers = Stream.iterate(1, n -> n + 1).parallel(); 
    Map<Integer, Boolean> resultMap = integers
                                        .limit(1000)
                                        .collect(HashMap::new,
                                                (map, value) -> map.put(value, false),
                                                HashMap::putAll);

根据文档,将根据生成的线程数调用 provider。如果我使用 ConcurrentHashMap 而不是 HashMap 怎么办?

并行执行时,多个中间结果可能是 实例化、填充和合并,以保持 可变数据结构。因此,即使并行执行 使用非线程安全数据结构(如 ArrayList(,否 并行缩减需要额外的同步。

当您使用

ConcurrentHashMap 而不是使用三参数collect方法HashMap时,不会有行为变化。若要更改行为,需要一个报告CONCURRENT特征的Collector,并且无法使用临时收集器指定特征。

此外,该操作必须是无序的,以启用并行收集操作,其中所有线程都累积到单个容器中。由于流属性,该操作可能是无序的,无论是本质上的,例如,当通过无序源(如HashSet(进行流式传输时,还是通过unordered()显式地通过,例如

Map<Integer, Boolean> resultMap = integers.limit(1000)
    .unordered()
    .collect(Collector.of(
        () -> new ConcurrentHashMap<>(),
        (map, value) -> map.put(value, false),
        (m1,m2) -> { m1.putAll(m2); return m1; },
        Collector.Characteristics.CONCURRENT));

或者由于收集器的UNORDERED特性:

Map<Integer, Boolean> resultMap = integers.limit(1000)
    .collect(Collector.of(
        () -> new ConcurrentHashMap<>(),
        (map, value) -> map.put(value, false),
        (m1,m2) -> { m1.putAll(m2); return m1; },
        Collector.Characteristics.CONCURRENT, Collector.Characteristics.UNORDERED));

后者是使用内置收集器时得到的:

Map<Integer, Boolean> resultMap = integers.limit(1000)
    .collect(Collectors.toConcurrentMap(Function.identity(), i -> Boolean.FALSE));

当您使用地图供应商时,toConcurrentMap将始终CONCURRENTUNORDERED并且需要ConcurrentMap,而toMap永远不会CONCURRENT,即使您提供的供应商创建了ConcurrentMap实现的实例。

准确地说,即使你使用ConcurrentHashMap线程,也不会共享数据,并且会有尽可能多的ConcurrentHashMap,即 supplier会被称为不。的时代。combiner,即最后一个参数,即BiConsumer将执行合并操作,但顺序不分先后,即无论哪个线程完成向其抛出数据,然后进行合并。

当你明确地说Collectors.toConcurrentMap那么收集器的行为是单个容器(即ConcurrentHashMap(所有线程都将推送数据,不需要联合努力。

最新更新