RxJava .toblocking.tosingle()根本不返回



我正在尝试对返回不同类型响应的两个不同REST服务进行并行调用。所以我使用了observable.zip(...).toBlocking().single(),但是它从来没有返回。

这是我正在做的…

  1. 尝试使用rx zip对不同的服务进行并行调用

服务我正在尝试呼叫1. orderInfo2. billInfo

    @Service
    public class ParallelServiceCallImpl {
    ...//var declaration ...
        Observable<OrderInfoResponse> getOrderInfoRsp(ServiceRequest serviceRequest) {
        return Observable.create((rx.Subscriber <? super OrderInfoResponse> s) -> {
        OrderInfoResponse ordrInfo = orderComponent.getOrderDetails(serviceRequest); // this component calls to end service
        });
    }
    Observable<BillInfoResponse> getBillInfoRsp(ServiceRequest serviceRequest) {
        return Observable.create((rx.Subscriber <? super BillInfoResponse> s) -> {
        BillInfoResponse billInfo = billInfoComponent.getBillingDetails(serviceRequest); // this component calls to end service
        });
    }

    public ServiceEndResponse getServiceBillFinance(ServiceRequest serviceRequest) {
        Observable<OrderInfoResponse> orderObservable = getOrderInfoRsp(serviceRequest);
        Observable<BillInfoResponse> billObservable = getBillInfoRsp(serviceRequest);
        Observable<ServiceEndResponse> responseObservable = Observable.zip(
            orderObservable,
            billObservable,
            (ordrInfo, billInfo) -> {
                ServiceEndResponse serviceEndResponse = serviceMapper.endResponseMapper(ordrInfo, billInfo);
                return serviceEndResponse;
            }
        );
        ServiceEndResponse serviceResponse = responseObservable.toBlocking().single();  // Not getting response 
        return serviceResponse;
    }
}

请让我知道我在这里错过了什么,为什么它不返回serviceResponse

新增一个

我想在上面再加一条…我在ProductComponent中有getProducts方法,它为所有productIds并行调用ProductInfo服务

    public List<ProductsInfo> getProducts(ServiceRequest serviceRequest, List<String> productIds)   {
        List<ProductsInfo> lstProductInfo = new ArrayList<ProductsInfo>();
        List<Observable<ProductsInfo>> lstPrd = new ArrayList<Observable<ProductsInfo>>(productIds.size());
        for (String prdID : productIds) {
            Observable<ProductsInfo> prd = getProductObservable(serviceRequest, prdID); //Observable
            lstPrd.add(prd);
        }
        Iterable<Observable<ProductsInfo>> iterable = (Iterable<Observable<ProductsInfo>>)lstPrd;
        List<ProductsInfo> listofProd = Observable.zip(iterable, new FuncN<List<ProductsInfo>>(){
            public List<ProductsInfo> call(Object...args)   {
                for (Object arg : args) { lstProductInfo.add((ProductsInfo)arg);}
                return lstProductInfo;
            }
        }).toBlocking().single();
        return lstProductInfo;  
    }       
    //Create and Return Observable
    private Observable<ProductsInfo> getProductObservable(ServiceRequest serviceRequest, String productId ) {
    return Observable.create((rx.Subscriber<? super ProductsInfo> s) -> {
        ProductsInfo prdct = getProduct(serviceRequest, productId);
        s.onNext(prdct);
        s.onCompleted();
    }).subscribeOn(Schedulers.io());
    }
    //Call End Service
    private ProductsInfo getProduct(ServiceRequest serviceRequest, String productId)    {
        serviceRequest.setProductID(productId);
        //service invocation
        svcResponse = getProductEndPoint.getProducts(serviceRequest);//**********/
        ProductsInfo product = serviceMapper.productMapper(svcResponse);
        return product;
    }   

我想并行调用ProductComponent.getProducts(ServiceRequest serviceRequest, List<String> productIds)以及上述2个服务。

不要使用create(OnSubscribe),背压和退订可能很难兑现。但是,当您使用create时,您没有向订阅者s发出任何项目,因此单个项目永远不会到达。

fromCallable:

代替create
Observable<BillInfoResponse> getBillInfoRsp(ServiceRequest serviceRequest) {
    return Observable.fromCallable(() -> 
        billInfoComponent.getBillingDetails(serviceRequest));
}

然后继续进行zip,就像你做的那样。如果可观察对象代表网络调用,你可能还想将.timeout添加到网络调用的可观察对象中,以便在调用挂起时提前失败。

在并行运行网络调用可观察对象时,在单个可观察对象上使用.subscribeOn(Schedulers.io()).subscribeOn(Schedulers.from(ExecutorService))

例如,并行压缩abc:

Scheduler scheduler = Schedulers.io();
a
 .zipWith(b.subscribeOn(scheduler))
 .zipWith(c.subscribeOn(scheduler))
 .subscribe(subscriber);

相关内容

  • 没有找到相关文章

最新更新