如何使用状态过滤通量



我想根据以前的值计算的状态,在Flux上应用filter。然而,根据javadoc ,建议避免在运算符中使用state

请注意,应避免在Flux运算符中使用java.util.function/lambdas中的state,因为这些状态可能在多个订阅服务器之间共享。

例如,Flux#distinct会筛选先前出现的项目。我们如何实现自己版本的distinct

我找到了问题的答案。Flux#distinct可以取一个提供初始状态的Supplier和一个执行"不同"检查的BiPredicate,因此我们可以在存储中存储任意状态,并决定是否保留每个元素。

以下代码显示了如何在不更改顺序的情况下保留每个mod2组的前3个元素。

// Get first 3 elements per mod 2.
Flux<Integer> first3PerMod2 =
Flux.fromIterable(ImmutableList.of(9, 3, 7, 4, 5, 10, 6, 8, 2, 1))
.distinct(
// Group by mod2
num -> num % 2,
// Counter to store how many elements have been processed for each group.
() -> new HashMap<Integer, Integer>(),
// Increment or set 1 to the counter,
// and return whether 3 elements are published.
(map, num) -> map.merge(num, 1, Integer::sum) <= 3,
// Clean up the state.
map -> map.clear());
StepVerifier.create(first3PerMod2).expectNext(9, 3, 7, 4, 10, 6).verifyComplete();

最新更新