Java 9 Flow 使用 lambda 定义订阅者



我开始使用Java 9 Flow API,我发现但不喜欢的第一件事是,当我们将订阅者实现传递到发布服务器时,似乎我们不能使用lambda,就像我们可以用RxJava做

的那样。所以我必须定义和实现我自己的订阅者类

public class CustomSubscriber<T> implements Flow.Subscriber<T> {
protected Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
System.out.println("Subscription done:");
subscription.request(1);
}
@Override
public void onNext(T item) {
System.out.println("Got : " + item);
subscription.request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done");
}
}

然后只需将其传递给我的发布者

SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
publisher.subscribe(new CustomSubscriber<>());

这真的很冗长,据我了解,这是因为我们需要在onSubscribe回调中设置订阅

protected Flow.Subscription subscription;     

以后用于onNext继续排放subscription.request(1);

我仍然不明白为什么需要这种机制,但它避免了像我们在 RxJava 中所做的那样使用 Lambdas,就像这个例子一样

SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
publisher.subscribe(item -> System.out.println("do something in the onNext"),
e -> System.out.println("do something in the onError"),
() -> System.out.println("Do something in the onComplete"));

我想这是不可能的,我在这里没有遗漏任何东西,对吧?

我仍然不明白为什么需要这种机制

订阅支持从订阅服务器到发布者的通信。request方法允许订户施加背压,通知上游组件它过载并且"需要休息"。为此,订阅者需要保留订阅的实例,并且需要偶尔调用request以获取更多项目。

无压力Subscriber

如果你有一个用例,你不需要应用背压,并希望从降低的复杂性中受益,你可以实现一个LaidBackSubscriber,它:

  • 通过存储订阅并立即调用request来实现onSubscribe
  • 通过执行构造期间给定的 lambda 然后调用subscription.request(1)来实现onNext
  • 通过执行构造期间给出的 lambda 来实现onErroronComplete

这应该能让你得到你想要的。

一般建议

Java 9 Flow API 是作为现有异步库的集成点创建的,而不是作为以临时方式实现响应式组件的邀请。尝试起来很棒,但是如果你真的想创建一个反应式系统,现有的库可能非常适合。

Java 9 Flow API 是一组从非响应式到响应式世界的 4 个接口和 1 个桥接类。没有运算符,没有方便的lambda版本,没有别的。

从理论上讲,它的引入是为了允许JDK本身基于反应式原理构建内部组件,但没有令人放心的迹象正在发生。

因此,用户负责在此API上构建困难,乏味且容易出错的组件。你最好等待主流库发布兼容版本,或者坚持使用更可用的基于反应 Streams.Org 的库,如RxJava 2和Reactor 3。

如果你仍然有兴趣在Flow API之上手动构建,你可以看看我的研究/原型库Reactive4JavaFlow,它实现了所需的lambda重载。

最新更新