支持RxJava的连锁改造服务



我在使用retrofit的RxJava支持链接可观察对象时遇到了麻烦。我可能误解了如何使用它,否则它可能是改造中的错误。希望这里有人能帮我弄明白这是怎么回事。编辑:我对这些响应使用MockRestAdapter -这可能是相关的,因为我看到RxSupport实现略有不同。

这是一个假的银行应用程序。它正在尝试进行转账,在转账完成后,它应该做一个帐户请求来更新帐户值。这只是我尝试flatMap的一个借口。不幸的是,下面的代码不起作用,没有订阅者得到通知:

案例1:链接两个改进生成的可观察对象

传输服务(注意:返回一个改进生成的可观察对象):

@FormUrlEncoded @POST("/user/transactions/")
public Observable<TransferResponse> transfer(@Field("session_id") String sessionId,
                                             @Field("from_account_number") String fromAccountNumber,
                                             @Field("to_account_number") String toAccountNumber,
                                             @Field("amount") String amount);

account服务(注意:返回一个改进生成的可观察对象):

@FormUrlEncoded @POST("/user/accounts")
public Observable<List<Account>> getAccounts(@Field("session_id") String sessionId);

将两个改进生成的可观察对象链接在一起:

transfersService.transfer(session.getSessionId(), fromAccountNumber, toAccountNumber, amount)
            .flatMap(new Func1<TransferResponse, Observable<? extends List<Account>>>() {
                @Override public Observable<? extends List<Account>> call(TransferResponse transferResponse) {
                    return accountsService.getAccounts(session.getSessionId());
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());

案例2:创建我自己的可观察对象并与一个改进生成的对象链接

如果我忽略了Retrofit中内置的Rx对"平面映射"调用的支持,它可以完美地工作!所有订阅者都会收到通知。见下文:

new accounts服务(注意:不产生一个可观察对象):

@FormUrlEncoded @POST("/user/accounts")
public List<Account> getAccountsBlocking(@Field("session_id") String sessionId);

创建我自己的可观察对象并自己发出项:

transfersService.transfer(session.getSessionId(), fromAccountNumber, toAccountNumber, amount)
            .flatMap(new Func1<TransferResponse, Observable<? extends List<Account>>>() {
                @Override public Observable<? extends List<Account>> call(TransferResponse transferResponse) {
                    return Observable.create(new Observable.OnSubscribe<List<Account>>() {
                        @Override public void call(Subscriber<? super List<Account>> subscriber) {
                            List<Account> accounts = accountsService.getAccountsBlocking(session.getSessionId());
                            subscriber.onNext(accounts);
                            subscriber.onCompleted();
                        }
                    });
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());

任何帮助都将非常感激!

答案是肯定的,你应该能够从Retrofit链观察。在MockRestAdapter$MockRxSupport:createMockObservable私有类中似乎有一个bug。将订阅者订阅到可观察对象的调度方式似乎是错误的。在HttpExecutor线程本身启动之后,才会订阅可观察对象。我相信来自Schedulers.io()线程的原始流在mockHandler之前完成并取消订阅。invokeSync返回的Observable可以被订阅。如果您看一下retrofit-mock模块中的代码,希望这个解释对您有所帮助。

作为当前代码的解决方案,当使用retrofit-mock时,您可以用自己的ImmediateExecutor实现替换内部默认Executor。这将允许至少在测试mock时具有单个线程流,这将由Schedulers.io提供。

// ImmediateExecutor.java
public class ImmediateExecutor implements Executor {
    @Override
    public void execute(Runnable command) {
        command.run();
    }
}
// Create your RestAdapter with your ImmdiateExecutor
RestAdapter adapter = new RestAdapter.Builder()
            .setEndpoint(endpoint)
            .setExecutors(new ImmediateExecutor(), null)
            .build();

至于在源代码中修复问题,您还可以在项目中包含retrofit-mock项目作为源代码,并使用下面的代码修改MockRestAdapter$MockRxSupport:createMockObservable方法。我已经测试了你的用例,它确实解决了这个问题。

—MockRestAdapter.java$MockRxSupport ----

Observable createMockObservable(final MockHandler mockHandler, final RestMethodInfo methodInfo,
        final RequestInterceptor interceptor, final Object[] args) {
      return Observable.create(new Observable.OnSubscribe<Object>() {
        @Override public void call(final Subscriber<? super Object> subscriber) {
          try {
            if (subscriber.isUnsubscribed()) return;
            Observable observable =
                (Observable) mockHandler.invokeSync(methodInfo, interceptor, args);
            observable.subscribeOn(Schedulers.from(httpExecutor));
            //noinspection unchecked
            observable.subscribe(subscriber);
          } catch (RetrofitError e) {
            subscriber.onError(errorHandler.handleError(e));
          } catch (Throwable e) {
            subscriber.onError(e);
          }
        }
      });
    }

在这里对Retrofit项目提出了一个问题,我们看看他们是否接受。

相关内容

  • 没有找到相关文章

最新更新