RxJava中订阅Observable的问题



我遇到一个问题,我的程序无法从subscribe()方法中的Observable获得结果。

我正在尝试构建一个简单的控制台应用程序,它向服务器运行请求,然后打印以控制台结果。我看到一切都很好,但我就是无法在subsribe()中得到结果。应用程序似乎在结果返回到方法之前就完成了。

这是我运行rquest:的代码

coffeeShopApi.getCoffeeShops("")
.subscribeOn(Schedulers.io())
.subscribe({
state.onNext(CoffeeShopViewState.CoffeeShopsLoaded(it))
}, {
it.printStackTrace()
state.onNext(CoffeeShopViewState.Error(it.localizedMessage))
})

执行完此代码后,程序将以退出代码0结束。它也是从主线程中的主函数运行的。这里可能有什么问题?

代码的问题是,您在另一个线程上订阅了当前线程(main(。因此,您的主线程在调用单个项目的订阅之前完成。

下面是一个带有堆栈的示例代码:

System.out.println(Thread.currentThread().getName() + ": start...");
Observable<String> stream = Observable.fromIterable(List.of("a", "b", "c"));
stream.subscribeOn(Schedulers.io())
.subscribe(next -> System.out.println(Thread.currentThread().getName() + ": " + next), 
error -> error.printStackTrace());
Thread.sleep(1000L);
System.out.println(Thread.currentThread().getName() + ": stop...");

Stacktrace:

main: start...
RxCachedThreadScheduler-1: a
RxCachedThreadScheduler-1: b
RxCachedThreadScheduler-1: c
main: stop...

结果表明,observable是在main线程中创建的。订阅的执行是在不同的线程RxCachedThreadScheduler-1中完成的。

在同一线程上执行的示例:

System.out.println(Thread.currentThread().getName() + ": start...");
Observable<String> stream = Observable.fromIterable(List.of("a", "b", "c"));
stream.subscribe(next -> System.out.println(Thread.currentThread().getName() + ": " + next), 
error -> error.printStackTrace());
System.out.println(Thread.currentThread().getName() + ": stop...");

Stacktrace:

main: start...
main: a
main: b
main: c
main: stop...

根据使用情况,可能需要等待流执行完成,然后再进一步处理该方法,例如使用同一线程或FutureObserver。在其他用例中,将计算成本高昂的操作外包到不同的线程中是可行的解决方案。


对于它是守护进程线程的问题?答案是。我附上了测试代码:

String message = String.format("%s-isDaemon?%s: start...", Thread.currentThread().getName(), Thread.currentThread().isDaemon());
System.out.println(message);
Observable<String> stream = Observable.fromIterable(List.of("a", "b", "c"));
stream.subscribeOn(Schedulers.io())
.subscribe(next -> {
final String innerMessage = String.format("%s-isDaemon?%s: %s", Thread.currentThread().getName(), Thread.currentThread().isDaemon(), next);
System.out.println(innerMessage);
},
Throwable::printStackTrace);
Thread.sleep(1000L);
message = String.format("%s-isDaemon?%s: stop...", Thread.currentThread().getName(), Thread.currentThread().isDaemon());
System.out.println(message);

堆垛机输出:

main-isDaemon?false: start...
RxCachedThreadScheduler-1-isDaemon?true: a
RxCachedThreadScheduler-1-isDaemon?true: b
RxCachedThreadScheduler-1-isDaemon?true: c
main-isDaemon?false: stop...

相关内容

  • 没有找到相关文章

最新更新