订阅者如何使用反应式拉反压力来控制发布服务器



我有一个发布服务器,它的发布速度可能比订阅服务器处理数据的速度更快。为了解决这个问题,我开始使用背压。因为我不想丢弃任何数据,所以我使用反应式拉反压力。我将此理解为订阅方可以告诉发布方何时发布更多数据,如本段和以下段落所述。

发布者是一个可流式的,它按时间顺序并行工作,然后合并为一个连续的可流式。数据最多应缓冲10个元素,当缓冲区已满时,Flowable不应再发布任何数据并等待下一个请求。

订阅者是一个DisposableSubscriber,它在开始时请求10个项目。每个消耗的物品都需要一些计算,然后会请求一个新的物品。

我的MWE是这样的:

List<Integer> src = new ArrayList<>();
for (int i = 0; i < 200; i++) {
src.add(i);
}
Flowable.fromIterable(src)
.parallel(10, 1)
.runOn(Schedulers.from(Executors.newFixedThreadPool(10)))
.flatMap(i -> Single.fromCallable(() -> {
System.out.println("publisher: " + i);
Thread.sleep(200);
return i;
}).toFlowable())
.sequential(1)
.onBackpressureBuffer(10)
.observeOn(Schedulers.newThread())
.subscribeOn(Schedulers.newThread())
.doOnError(Throwable::printStackTrace)
.subscribeWith(new DisposableSubscriber<Integer>() {
@Override
protected void onStart() {
request(10);
}
@Override
public void onNext(Integer integer) {
System.out.println("subscriber: " + integer);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
request(1);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
e.printStackTrace();
}

我期望此代码执行以下操作:订阅者请求前10个项目。出版商出版了前10篇文章。然后,订阅者在onNext中进行计算,并请求更多项目,发布者将发布这些项目。

实际情况:起初,出版商似乎无限制地发布项目。在某个时刻,例如在发布了14个项目之后,订户处理其第一个项目。在这种情况下,发布者将继续发布项目。在发布了大约30个项目之后,抛出一个io.reactivex.exceptions.MissingBackpressureException: Buffer is full,流结束。

我的问题:我做错了什么如何让订阅者控制发布者是否以及何时发布数据很明显,我做错了什么。否则,预期与现实不会有如此大的不同。

上述MWE的输出示例:

publisher: 5
publisher: 7
publisher: 8
publisher: 0
publisher: 2
publisher: 6
publisher: 9
publisher: 3
publisher: 4
publisher: 1
publisher: 18
publisher: 17
publisher: 15
subscriber: 0
publisher: 11
publisher: 10
publisher: 19
publisher: 13
publisher: 14
publisher: 12
publisher: 16
publisher: 27
publisher: 28
publisher: 23
publisher: 21
publisher: 29
publisher: 20
publisher: 25
publisher: 22
publisher: 26
io.reactivex.exceptions.MissingBackpressureException: Buffer is full

不是Rx方面的专家,但让我尝试一下。observeOn(...)有自己的默认缓冲区大小128。因此,从一开始,它将从上游请求比缓冲区所能容纳的更多的数据。

observeOn(...)接受一个可选的缓冲区大小重写,但即使您提供了它,ParallelFlowable也会比您想要的更频繁地调用flatMap(...)方法。我不能100%确定为什么,也许它有自己的内部缓冲,它在将轨道合并回序列时执行。

我认为,通过使用flatMap(...)而不是parralel(...),提供maxConcurrency参数,可以更接近您想要的行为。

需要记住的另一件事是,您不想调用subscribeOn(...)——这意味着要影响整个上游Flowable。所以,如果你已经在调用parallel(...).runOn(...),它没有任何效果,或者效果会出乎意料。

有了以上内容,我认为这会让你更接近你想要的:

List<Integer> src = new ArrayList<>();
for (int i = 0; i < 200; i++) {
src.add(i);
}
Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(10));
Flowable.fromIterable(src)
.flatMap(
i -> Flowable.just( i )
.subscribeOn(scheduler) // here subscribeOn(...) affects just this nested Flowable
.map( __ -> {
System.out.println("publisher: " + i);
Thread.sleep(200);
return i;
} ),
10) // max concurrency
.observeOn(Schedulers.newThread(), false, 10) // override buffer size
.doOnError(Throwable::printStackTrace)
.subscribeWith(new DisposableSubscriber<Integer>() {
@Override
protected void onStart() {
request(10);
}
@Override
public void onNext(Integer integer) {
System.out.println("subscriber: " + integer);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
request(1);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
e.printStackTrace();
}

最新更新