将Observable.create
更改为Observable.fromPublisher
不起作用。(如果没有样本,则全部订阅;如果有样本,则没有订阅。)
可观察到的。create和fromPublisher?
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;
public class SampleMain {
public static void main(String[] args) {
Observable<String> o = Observable.create(s -> {
new Thread(() -> {
for (int i=0; i<100; i++) {
s.onNext("Hello Observable.fromPublisher() A" + i);
s.onNext("Hello Observable.fromPublisher() B" + i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
s.onComplete();
}).start();
});
o
.sample(1, TimeUnit.MILLISECONDS)
.subscribe(System.out::println);
}
}
fromPublisher
需要一个正确实现的遵守规则的org.reactivestreams.Publisher
。这些Publisher
通常来自第三方库或api。
create
已经内置了基础设施,可以将更简单的发射器式API转换为Observable
,这样开发人员就不必担心底层协议的太多问题。
请注意fromPublisher
:
发布者必须遵循响应流规范。违反规范可能会导致未定义的行为。
如果可能的话,使用create(ObservableOnSubscribe)来创建一个类似源的Observable。
请注意,尽管Publisher看起来是一个功能接口,但不建议通过lambda来实现它,因为规范要求进行状态管理,而使用无状态lambda是无法实现的。