Java Flux.buffer 在缓冲请求数量的元素之前不返回任何结果



下面的示例仅在缓冲了前 5 个元素后返回缓冲区。如何让它也返回前 5 个元素的缓冲区?

@Test
public void testBuffer() {
Flux.range(0, 10)
.buffer(5,1)
.doOnNext(i -> log.info("tape {}", i))
.blockLast();
}

返回:

tape [0, 1, 2, 3, 4]
tape [1, 2, 3, 4, 5]
tape [2, 3, 4, 5, 6]
tape [3, 4, 5, 6, 7]
tape [4, 5, 6, 7, 8]
tape [5, 6, 7, 8, 9]
tape [6, 7, 8, 9]
tape [7, 8, 9]
tape [8, 9]
tape [9]

期望的结果:

tape [0]
tape [0, 1]
tape [0, 1, 2]
tape [0, 1, 2, 3]
tape [0, 1, 2, 3, 4]
tape [1, 2, 3, 4, 5]
tape [2, 3, 4, 5, 6]
tape [3, 4, 5, 6, 7]
tape [4, 5, 6, 7, 8]
tape [5, 6, 7, 8, 9]
tape [6, 7, 8, 9]
tape [7, 8, 9]
tape [8, 9]
tape [9]

我不知道有任何可用的运算符可以做这样的事情。

但是,可以组合多个运算符来产生此行为:

  1. 使用Flux#scan创建滑动/增长缓冲区。扫描对于以非常可自定义的方式累积源值非常有用。但是请注意,一旦收到最后一个值,就不可能触发更多的缓冲/值。因此:
  2. 我们需要创建一个简单的运算符,该运算符获取最后一个发出的缓冲区,并重播它,逐个删除标题值。
  3. 我们需要结合 1. 和 2. 以获得整个滑动。请注意,由于我没有找到任何运算符来连接由另一个的最后一个发出值产生的通量(本质上,一个将接收前一个通量最后一个值的 concatWith),我将在这里使用缓存。

警告我绝对没有衡量过这种策略对性能的影响,它可能很糟糕。

这是一个工作原型:

import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.Arrays;
import java.util.function.IntFunction;
import java.util.stream.Collectors;
public class ReactorAccumulator {
public static void main(String[] args) {
Flux<Integer[]> growingSlider = Flux.range(0, 10)
.scan(new SlidingBuffer<>(5, Integer[]::new), SlidingBuffer::accumulate)
.skip(1)
.map(SlidingBuffer::getBuffer);
print("Growing slider", growingSlider);
var decreasingSlider = growingSlider.last()
.flatMapMany(buffer -> slideDecrease(buffer, 1));
print("Decreasing slider", decreasingSlider);
var slider = thenDecrease(growingSlider);
print("Growing then decreasing slider", slider);
}
private static <V> void print(String title, Flux<V[]> dataStream) {
var nl = System.lineSeparator();
var log = dataStream
.map(Arrays::toString)
.collect(Collectors.joining(nl, "-- "+ title + nl, nl + "--"))
.block(Duration.ofSeconds(1));
System.out.println(log);
}
private static <V> Flux<V[]> slideDecrease(V[] buffer, int startIndex) {
if (buffer.length <= startIndex) return Flux.empty();
else return Flux.range(startIndex, buffer.length - startIndex)
.map(from -> Arrays.copyOfRange(buffer, from, buffer.length));
}
private static <V> Flux<V[]> thenDecrease(Flux<V[]> dataStream) {
var cacheLast = dataStream.cache(1);
return cacheLast.concatWith(cacheLast.last()
.flatMapMany(buffer -> slideDecrease(buffer, 1)));
}
static final class SlidingBuffer<V> {
private final int maxSize;
private final V[] buffer;
SlidingBuffer(int maxSize, IntFunction<V[]> creator) {
this(maxSize, creator.apply(0));
}
private SlidingBuffer(int maxSize, V[] buffer) {
this.maxSize = maxSize;
this.buffer = buffer;
}
SlidingBuffer<V> accumulate(V newValue) {
final V[] nextBuffer = (buffer.length < maxSize)
? Arrays.copyOf(buffer, buffer.length + 1)
: Arrays.copyOfRange(buffer, 1, maxSize + 1);
nextBuffer[nextBuffer.length - 1] = newValue;
return new SlidingBuffer<>(maxSize, nextBuffer);
}
V[] getBuffer() {
// Def copy: might not be needed if you trust consumer code not to modify the array directly
return Arrays.copyOf(buffer, buffer.length);
}
}
}

这将产生输出:

-- Growing slider
[0]
[0, 1]
[0, 1, 2]
[0, 1, 2, 3]
[0, 1, 2, 3, 4]
[1, 2, 3, 4, 5]
[2, 3, 4, 5, 6]
[3, 4, 5, 6, 7]
[4, 5, 6, 7, 8]
[5, 6, 7, 8, 9]
--
-- Decreasing slider
[6, 7, 8, 9]
[7, 8, 9]
[8, 9]
[9]
--
-- Growing then decreasing slider
[0]
[0, 1]
[0, 1, 2]
[0, 1, 2, 3]
[0, 1, 2, 3, 4]
[1, 2, 3, 4, 5]
[2, 3, 4, 5, 6]
[3, 4, 5, 6, 7]
[4, 5, 6, 7, 8]
[5, 6, 7, 8, 9]
[6, 7, 8, 9]
[7, 8, 9]
[8, 9]
[9]
--

这个答案的灵感来自阿曼宁的答案,但我没有在递增的缓冲区末尾附加一个递减的缓冲区,而是在已经存在的Flux.buffer前面附加一个递增的缓冲区。

此解决方案还使用Flux.scan来创建递增缓冲区,但我使用Flux.publish().refCount(2)创建两个 Flux 管道,然后使用Flux.mergeSequential将它们连接在一起。首先,我让增加的缓冲区在列表小于缓冲区大小时发出列表,然后将其连接到Flux.buffer列表:

public static <T> Flux<List<T>> slidingBuffer(Flux<T> flux, int maxSize) {
Flux<T> sharedFlux = flux.publish().refCount(2);
Flux<ArrayList<T>> increasingBufferFlux = sharedFlux
.scan(new ArrayList<T>(), (list, t) -> {
ArrayList<T> nextItem = new ArrayList<>(list);
nextItem.add(t);
return nextItem;
})
.skip(1) // first emitted item of Flux.scan is the initial empty ArrayList
.take(maxSize - 1); // We only need the results smaller than maxSize.
Flux<List<T>> decreasingBufferFlux = sharedFlux.buffer(maxSize, 1);
return Flux.mergeSequential(increasingBufferFlux, decreasingBufferFlux);
}

测试输出:

@Test
public void slidingBufferTest() {
Flux.range(0, 10)
.transform(flux -> slidingBuffer(flux, 5))
.doOnNext(System.out::println)
.blockLast();
}
[0]
[0, 1]
[0, 1, 2]
[0, 1, 2, 3]
[0, 1, 2, 3, 4]
[1, 2, 3, 4, 5]
[2, 3, 4, 5, 6]
[3, 4, 5, 6, 7]
[4, 5, 6, 7, 8]
[5, 6, 7, 8, 9]
[6, 7, 8, 9]
[7, 8, 9]
[8, 9]
[9]

编辑:我用Flux.publish().refCount(2)替换了Flux.share()。 一旦第一个通量(递增 BufferFlux)订阅,share将立即发出项目,而不是等待第二个通量订阅,从而导致原始通量被评估两次。这是通过使用refCount(int)修复的,它只会在有 int 订阅者时发出。

最新更新