MissingBackpressureException:由于缺少请求,无法发出缓冲区



我收到了一份错误报告,其中包括RxJavaFlowableMissingBackpressureException: Could not emit buffer due to lack of requests,但我正在努力创建一个简单的测试用例来演示问题(维护Flowable的结构(。

以下是我试图组合的测试,它在管道中保持相同的阶段:

int inputEvents=10000;
CountDownLatch completed = new CountDownLatch(1);
Flowable<List<String>> flowable = Flowable.<String>create(e -> {
System.out.println(Thread.currentThread().getName() + ": Will send");
for (int counter = 0; counter < inputEvents; counter++) {
e.onNext("" + counter);
Thread.sleep(5);
}
System.out.println(Thread.currentThread().getName() + ": Completed sending");
e.onComplete();
}, BackpressureStrategy.DROP)
.onBackpressureDrop(s -> System.out.println("Backpressure, dropping " + Arrays.asList(s)))
.buffer(1, TimeUnit.SECONDS)
.doOnNext(strings -> System.out.println("t" + Thread.currentThread().getName() + ": Buffered: " + strings.size() + " items"))
.observeOn(Schedulers.io(), false)
.doOnNext(strings -> {
System.out.println("t" + "t" + Thread.currentThread().getName() + ": Waiting: " + strings.size());
Thread.sleep(5000);
});
flowable
.subscribe(s -> System.out.println("t" + "t" + "onNext: " + s.size()),
error -> {
throw new RuntimeException(error);
},
() -> {
System.out.println("t" + "t" + "Complete");
completed.countDown();
});
completed.await();

在生产中,我们得到了具有以下堆栈跟踪的MissingBackpressureException: Could not emit buffer due to lack of requests

io.reactivex.rxjava3.exceptions.MissingBackpressureException: Could not emit buffer due to lack of requests
at io.reactivex.rxjava3.internal.subscribers.QueueDrainSubscriber.fastPathEmitMax(QueueDrainSubscriber.java:87)
at io.reactivex.rxjava3.internal.operators.flowable.FlowableBufferTimed$BufferExactUnboundedSubscriber.run(FlowableBufferTimed.java:207)
at io.reactivex.rxjava3.internal.schedulers.ScheduledDirectPeriodicTask.run(ScheduledDirectPeriodicTask.java:39)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.runAndReset(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)

所以我认为这与缓冲区的下游工作有关。

然而,无论我在doOnNext中阻塞多长时间,我都无法重现这个问题。示例输出:

main: Will send
RxComputationThreadPool-1: Buffered: 197 items
RxCachedThreadScheduler-1: Waiting: 197
RxComputationThreadPool-1: Buffered: 196 items
RxComputationThreadPool-1: Buffered: 197 items
RxComputationThreadPool-1: Buffered: 197 items
RxComputationThreadPool-1: Buffered: 196 items
RxComputationThreadPool-1: Buffered: 197 items
onNext: 197
RxCachedThreadScheduler-1: Waiting: 196
RxComputationThreadPool-1: Buffered: 197 items
RxComputationThreadPool-1: Buffered: 197 items
RxComputationThreadPool-1: Buffered: 196 items
RxComputationThreadPool-1: Buffered: 197 items
RxComputationThreadPool-1: Buffered: 197 items
onNext: 196
RxCachedThreadScheduler-1: Waiting: 197
RxComputationThreadPool-1: Buffered: 197 items
RxComputationThreadPool-1: Buffered: 197 items
...

我原以为,由于Thread.sleep(5000)需要很长时间,我们会恢复压力。

有没有一种方法可以模拟这种情况,最好是在测试中使用TestScheduler/TestSubscriber(以避免Thread.sleep()(?

我能够通过增加事件的发射速率、增加事件的最大数量以及降低消费者处理它们的速率来重现MissingBackpressureException。

溢出的缓冲区是大小为128的默认observeOn(...)运算符的缓冲区。由于它每秒都会收到一次新的列表,因此至少需要几分钟的背压才能溢出。

注意,您可以通过将此默认缓冲区大小作为参数传递给observeOn(...)来覆盖它。

回到背压处理,我认为您的管道的主要问题是buffer(1, TimeUnit.SECONDS)运算符。如果您查看javadoc:

背压:此运算符不支持使用背压时间它向上游请求Long.MAX_VALUE,而不服从下游请求。

由于以上原因,您的onBackPressureDrop(...)永远不会被调用。我认为你可以通过把onBackPressureDrop(...)放在buffer(...)之后来解决这个问题。这样做会产生Backpressure, dropping...消息。

您应该能够使用以下方法对此进行单元测试:TestScheduler.advanceTimeBy(long, TimeUnit)。尽管我不得不承认,我还没有尝试过。

最新更新