Java Stream GroupBy and Reduce



我有一个 Item 类,其中包含代码、数量和金额字段,以及可能包含许多项目(具有相同代码)的项目列表。我想按代码对项目进行分组并汇总它们的数量和数量。

我能够使用流的groupingByreduce实现其中的一半。分组是有效的,但减少是将所有分组的项目减少到一个通过不同代码重复的单个项目(groupingBy键)。

这里不应该减少地图中每个代码的项目列表吗?为什么它为所有人重新调整相同的组合项目。

下面是一个示例代码。

import java.util.List;
import java.util.Arrays;
import java.util.stream.Collectors;
import java.util.Map;
class HelloWorld {
public static void main(String[] args) {
List<Item> itemList = Arrays.asList(
createItem("CODE1", 1, 12),
createItem("CODE2", 4, 22),
createItem("CODE3", 5, 50),
createItem("CODE4", 2, 11),
createItem("CODE4", 8, 20),
createItem("CODE2", 1, 42)
);

Map<String, Item> aggregatedItems = itemList
.stream()
.collect(Collectors.groupingBy(
Item::getCode,
Collectors.reducing(new Item(), (aggregatedItem, item) -> {
int aggregatedQuantity = aggregatedItem.getQuantity();
double aggregatedAmount = aggregatedItem.getAmount();

aggregatedItem.setQuantity(aggregatedQuantity + item.getQuantity());
aggregatedItem.setAmount(aggregatedAmount + item.getAmount());

return aggregatedItem;
})
));

System.out.println("Map total size: " + aggregatedItems.size()); // expected 4
System.out.println();
aggregatedItems.forEach((key, value) -> {
System.out.println("key: " + key);
System.out.println("value - quantity: " + value.getQuantity() + " - amount: " + value.getAmount());
System.out.println();
});
}

private static Item createItem(String code, int quantity, double amount) {
Item item = new Item();
item.setCode(code);
item.setQuantity(quantity);
item.setAmount(amount);
return item;
}
}
class Item {
private String code;
private int quantity;
private double amount;

public Item() {
quantity = 0;
amount = 0.0;
}

public String getCode() { return code; }
public int getQuantity() { return quantity; }
public double getAmount() { return amount; }

public void setCode(String code) { this.code = code; }
public void setQuantity(int quantity) { this.quantity = quantity; }
public void setAmount(double amount) { this.amount = amount; }
}

下面是输出。

Map total size: 4
key: CODE2
value - quantity: 21 - amount: 157.0
key: CODE1
value - quantity: 21 - amount: 157.0
key: CODE4
value - quantity: 21 - amount: 157.0
key: CODE3
value - quantity: 21 - amount: 157.0

可变缩减与不可变还原

在这种情况下,Collectors.reducing()不是正确的工具,因为它意味着不可变的还原,即执行折叠操作,其中每个缩减步骤都会导致创建一个新的不可变对象。

但是,您不是在每个缩减步骤中生成一个新对象,而是更改作为标识提供的对象的状态。

因此,您得到的结果不正确,因为每个线程只会创建一次标识对象。Item的这个实例用于累积,对它的引用最终出现在映射的每个值中。

可以在流 API 文档中找到更详细的信息,特别是在以下部分中:缩减可变减少

这里有一个简短的引文来解释Stream.reduce()是如何工作的(Collectors.reducing()背后的机制是相同的):

累加器函数采用部分结果和下一个元素,并生成新的部分结果

使用可变缩减

这个问题可以通过生成一个新的Item实例来解决,同时累积映射到同一的值,但性能更高的方法是改用可变归约

为此,您可以实现通过静态方法创建的自定义收集器Collector.of()

Map<String, Item> aggregatedItems = itemList.stream()
.collect(Collectors.groupingBy(
Item::getCode,
Collector.of(
Item::new,   // mutable container of the collector
Item::merge, // accumulator - defines how stream data should be accumulated
Item::merge  // combiner - mergin the two containers while executing stream in parallel
)
));

为方便起见,您可以引入负责累积两个项目属性的方法merge()。它将允许避免在累加器和组合器中重复相同的逻辑,并保持收集实现的精简和可读性。

public class Item {
private String code;
private int quantity;
private double amount;

// getters, constructor, etc.

public Item merge(Item other) {
this.quantity += other.quantity;
this.amount += other.amount;
return this;
}
}

不得将输入参数修改为Collectors.reducingnew Item()只执行一次,所有归约操作将共享同一个"聚合实例"。换句话说:映射将包含相同的值实例 4 次(您可以使用System.identityHashCode()或通过比较引用相等性来轻松检查自己:aggregatedItems.get("CODE1") == aggregatedItems.get("CODE2"))。

而是返回一个新的结果实例:

final Map<String, Item> aggregatedItems = itemList
.stream()
.collect(Collectors.groupingBy(
Item::getCode,
Collectors.reducing(new Item(), (item1, item2) -> {
final Item reduced = new Item();
reduced.setQuantity(item1.getQuantity() + item2.getQuantity());
reduced.setAmount(item1.getAmount() + item2.getAmount());
return reduced;
})
));

输出:

Map total size: 4
key: CODE2
value - quantity: 5 - amount: 64.0
key: CODE1
value - quantity: 1 - amount: 12.0
key: CODE4
value - quantity: 10 - amount: 31.0
key: CODE3
value - quantity: 5 - amount: 50.0

> 您正在使用reducing,它假设您不会改变传入的累加器。reducing不会为每个新组为您创建新的Item,而是希望您创建新的Item并在 lambda 中返回它们,如下所示:

// this works as expected
.collect(Collectors.groupingBy(
Item::getCode,
Collectors.reducing(new Item(), (item1, item2) -> createItem(
item1.getCode(),
item1.getQuantity() + item2.getQuantity(),
item1.getAmount() + item2.getAmount()
))
));

因此,如果您使用不可变的对象(如数字或字符串),则非常适合。

由于您没有在代码中创建新的Item,因此reducing会继续重用相同的实例,从而导致您看到的行为。

如果要更改对象,可以使用Collector.of以线程安全的方式执行可变减少:

.collect(Collectors.groupingBy(
Item::getCode,
Collector.of(Item::new, (aggregatedItem, item) -> {
int aggregatedQuantity = aggregatedItem.getQuantity();
double aggregatedAmount = aggregatedItem.getAmount();
aggregatedItem.setQuantity(aggregatedQuantity + item.getQuantity());
aggregatedItem.setAmount(aggregatedAmount + item.getAmount());
}, (item1, item2) -> createItem(
item1.getCode(),
item1.getQuantity() + item2.getQuantity(),
item1.getAmount() + item2.getAmount()
))
));

请注意,您现在将引用传递给Item的构造函数,即在必要时创建新Item的方法,而不仅仅是单个new Item()。此外,您还提供了第三个参数(合并器),该参数告诉收集器如何从两个现有项创建新项,如果在并发情况下使用此收集器,则将使用这些项。(有关合路器的更多信息,请参见此处)

Collector.ofCollectors.reducing之间的这种对比,就是Stream.reduceStream.collect的对比。在此处了解更多信息。

最新更新