RxJava没有调用subscribe(),但仍在工作.这怎么可能



我正在使用RxJava和Reform来使用不同的端点。我正在使用几个微服务,它们都使用RxJava和Reform来使用其他服务。

我没有使用Observables的经验,所以我正在互联网上查看一些例子,学习如何使用它,并自己创建一些服务。我看到方法subscribe((总是被使用。类似这样的东西:

@Setter
@Getter
private MovieDetail movieDetail;
public Observable<Movies> observe() {
allMoviesClientRetrofit
.getMovies()
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe(new Observer<Movies>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(Movies movies) {
allMovies = movies;
});

在我工作的服务中,我到处搜索,从未使用过subscribe((,但一切都正常。这怎么可能?

正如你所看到的,在这个例子中,我需要返回一个Observable来保持我的个人代码与我工作中的代码一致,但如果我使用subscribe((方法,它会返回一个Subscription对象,但这不起作用。

这是我工作代码的一部分,你可以看到subscribe((从未被调用,但它能正常工作

@GetMapping(
value = "/something",
produces = MediaType.APPLICATION_JSON_UTF8_VALUE
)
public Single<ResponseEntity<Something>> getSomething() {

return retrieveSomethingFactory
.observe()
.toSingle()
.map(something -> {
return ResponseEntity
.status(httpStatus)
.body(something);
});

class retrieveSomethingFactoryImpl implements retrieveSomethingFactory
@Override
public Observable<Something> observe() {
return Observable
.defer(() -> {
Observable<Something1> something1 = retrieveSomething1Factory
.call(link) // Retrofit call
.observe()
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation());
Observable<Something2> something2 = retrieveSomething1Factory
.call(link) // Retrofit call
.observe()
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation()); 
return Observable
.zip(something1, something2.toList(), (something1, something2) -> {
....
....
....        
return something;
});                                                       

感谢

retrieveSomethingFactory.observe()调用返回的Observable似乎是一个热的可观察对象,这意味着无论是否订阅,它都会发出项目。你可以在这里读到一篇关于冷热可观测性的好文章。

最新更新