一次性对象在订阅后为null



在下面的代码中,我创建了一个使用Rx学习函数编程的示例。我正在尝试将HandlerThread处理为可观察的。在onResume()中,我订阅了Single.justobservable以启动HandlerThread

对于SingleObserver回调,尽管接收到onSuccess()中的值,但onSubscribe()中的Disposable对象始终是null

我还发布了日志。请看一下,并告诉我为什么Disposable对象dnull

代码

onResume() {
this.mMyHandlerThreadInitSingleObs = Single.just(this.getInitializedHandlerThread())
.map(myHandlerThread->{
Log.d(TAG_LOG, "BEFORE .start()");
myHandlerThread.start();
Log.d(TAG_LOG, "AFTER .start()");
return this.mMyHandlerThread;
});
this.mMyHandlerThreadInitSingleObs
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this.getSingleObserver());
}
private SingleObserver<HandlerThread> getSingleObserver() {
String TAG_LOG = ActMain.TAG_LOG + "." + "getSingleObserver()";
return new SingleObserver<HandlerThread>() {
@Override
public void onSubscribe(Disposable d) {
Log.v(TAG_LOG, "[onSubscribe] d: " + d);
}
@Override
public void onSuccess(HandlerThread is) {
Log.v(TAG_LOG, "[onSuccess] is: " + is);
}
@Override
public void onError(Throwable e) {
Log.e(TAG_LOG, "[onError]");
}
};
}

logcat

2018-12-22 14:56:50.329 12611-12611 V/ActMain: onStart
2018-12-22 14:56:50.332 12611-12611 V/ActMain.MyHandlerThread: constructor called
2018-12-22 14:56:50.333 12611-12611 V/ActMain.getSingleObserver(): [onSubscribe] d: null//<--------------------
2018-12-22 14:56:50.349 12611-12611 D/ActMain.onResume(): BEFORE .start()
2018-12-22 14:56:50.349 12611-12611 D/ActMain.onResume(): AFTER .start()
2018-12-22 14:56:50.350 12611-12630 V/ActMain.MyHandlerThread.onLooperPrepared: ..
2018-12-22 14:56:50.350 12611-12630 D/ActMain.MyHandlerThread.onLooperPrepared: this.getLooper(): Looper (my HandlerThread, tid 416) {2f35ee2}
2018-12-22 14:56:50.363 12611-12630 I/ActMain.MyHandlerThread.onLooperPrepared: [onSubscribe] d: null
2018-12-22 14:56:50.377 12611-12633 D/ActMain.MyHandlerThread.onLooperPrepared.emitter.onComplete():: this.getLooper() initialized: Looper (my HandlerThread, tid 416) {2f35ee2}
2018-12-22 14:56:50.377 12611-12633 I/ActMain.MyHandlerThread.onLooperPrepared: [onComplete]
2018-12-22 14:56:50.425 12611-12611 V/ActMain.getSingleObserver(): [onSuccess] is: Thread[my HandlerThread,5,main]
2018-12-22 14:56:50.514 1700-1724 I/ActivityManager: Displayed com.example.amrbakri.rxhandlerthread_01/.ActMain: +340ms

事实证明,这是一个有趣的问题。来自RxJava2(单)的文档

Single的行为与Observable类似,只是它只能发射单个成功值或错误(没有"onComplete"对于Observable存在的通知)。

Single类实现了SingleSource基接口和它通过subscribe(SingleObserver)方法。

看起来您使用的是SingleObserver而不是DisposableSingleObserver

文件中提到:

请注意,根据设计,通过订阅(SingleObserver)进行的订阅不能从外部处理(因此subscribe(SingleObserver)方法)SingleObserver的实现者允许这种情况发生。RxJava通过标准DisposableSingleObserver支持这种使用例子为了方便起见,subscribeWith(SingleObserver)方法是还提供了允许使用SingleObserver(或子类)要以流畅的方式应用的实例(例如在示例中以上)。

所以试着这样做:

Disposable d = Single.just("Hello World")
.delay(10, TimeUnit.SECONDS, Schedulers.io())
.subscribeWith(new DisposableSingleObserver<String>() {
@Override
public void onStart() {
System.out.println("Started");
}
@Override
public void onSuccess(String value) {
System.out.println("Success: " + value);
}
@Override
public void onError(Throwable error) {
error.printStackTrace();
}
});

最新更新