我开始使用Java 9 Flow API,我发现但不喜欢的第一件事是,当我们将订阅者实现传递到发布服务器时,似乎我们不能使用lambda,就像我们可以用RxJava做
的那样。所以我必须定义和实现我自己的订阅者类
public class CustomSubscriber<T> implements Flow.Subscriber<T> {
protected Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
System.out.println("Subscription done:");
subscription.request(1);
}
@Override
public void onNext(T item) {
System.out.println("Got : " + item);
subscription.request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done");
}
}
然后只需将其传递给我的发布者
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
publisher.subscribe(new CustomSubscriber<>());
这真的很冗长,据我了解,这是因为我们需要在onSubscribe
回调中设置订阅
protected Flow.Subscription subscription;
以后用于onNext
继续排放subscription.request(1);
我仍然不明白为什么需要这种机制,但它避免了像我们在 RxJava 中所做的那样使用 Lambdas,就像这个例子一样
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
publisher.subscribe(item -> System.out.println("do something in the onNext"),
e -> System.out.println("do something in the onError"),
() -> System.out.println("Do something in the onComplete"));
我想这是不可能的,我在这里没有遗漏任何东西,对吧?
我仍然不明白为什么需要这种机制
订阅支持从订阅服务器到发布者的通信。request
方法允许订户施加背压,通知上游组件它过载并且"需要休息"。为此,订阅者需要保留订阅的实例,并且需要偶尔调用request
以获取更多项目。
无压力Subscriber
如果你有一个用例,你不需要应用背压,并希望从降低的复杂性中受益,你可以实现一个LaidBackSubscriber
,它:
- 通过存储订阅并立即调用
request
来实现onSubscribe
- 通过执行构造期间给定的 lambda 然后调用
subscription.request(1)
来实现onNext
- 通过执行构造期间给出的 lambda 来实现
onError
和onComplete
这应该能让你得到你想要的。
一般建议
Java 9 Flow API 是作为现有异步库的集成点创建的,而不是作为以临时方式实现响应式组件的邀请。尝试起来很棒,但是如果你真的想创建一个反应式系统,现有的库可能非常适合。
Java 9 Flow API 是一组从非响应式到响应式世界的 4 个接口和 1 个桥接类。没有运算符,没有方便的lambda版本,没有别的。
从理论上讲,它的引入是为了允许JDK本身基于反应式原理构建内部组件,但没有令人放心的迹象正在发生。
因此,用户负责在此API上构建困难,乏味且容易出错的组件。你最好等待主流库发布兼容版本,或者坚持使用更可用的基于反应 Streams.Org 的库,如RxJava 2和Reactor 3。
如果你仍然有兴趣在Flow API之上手动构建,你可以看看我的研究/原型库Reactive4JavaFlow,它实现了所需的lambda重载。