我正在尝试对返回不同类型响应的两个不同REST服务进行并行调用。所以我使用了observable.zip(...).toBlocking().single()
,但是它从来没有返回。
这是我正在做的…
- 尝试使用rx zip对不同的服务进行并行调用
服务我正在尝试呼叫1. orderInfo2. billInfo
@Service
public class ParallelServiceCallImpl {
...//var declaration ...
Observable<OrderInfoResponse> getOrderInfoRsp(ServiceRequest serviceRequest) {
return Observable.create((rx.Subscriber <? super OrderInfoResponse> s) -> {
OrderInfoResponse ordrInfo = orderComponent.getOrderDetails(serviceRequest); // this component calls to end service
});
}
Observable<BillInfoResponse> getBillInfoRsp(ServiceRequest serviceRequest) {
return Observable.create((rx.Subscriber <? super BillInfoResponse> s) -> {
BillInfoResponse billInfo = billInfoComponent.getBillingDetails(serviceRequest); // this component calls to end service
});
}
public ServiceEndResponse getServiceBillFinance(ServiceRequest serviceRequest) {
Observable<OrderInfoResponse> orderObservable = getOrderInfoRsp(serviceRequest);
Observable<BillInfoResponse> billObservable = getBillInfoRsp(serviceRequest);
Observable<ServiceEndResponse> responseObservable = Observable.zip(
orderObservable,
billObservable,
(ordrInfo, billInfo) -> {
ServiceEndResponse serviceEndResponse = serviceMapper.endResponseMapper(ordrInfo, billInfo);
return serviceEndResponse;
}
);
ServiceEndResponse serviceResponse = responseObservable.toBlocking().single(); // Not getting response
return serviceResponse;
}
}
请让我知道我在这里错过了什么,为什么它不返回serviceResponse
新增一个
我想在上面再加一条…我在ProductComponent
中有getProducts
方法,它为所有productIds
并行调用ProductInfo
服务
public List<ProductsInfo> getProducts(ServiceRequest serviceRequest, List<String> productIds) {
List<ProductsInfo> lstProductInfo = new ArrayList<ProductsInfo>();
List<Observable<ProductsInfo>> lstPrd = new ArrayList<Observable<ProductsInfo>>(productIds.size());
for (String prdID : productIds) {
Observable<ProductsInfo> prd = getProductObservable(serviceRequest, prdID); //Observable
lstPrd.add(prd);
}
Iterable<Observable<ProductsInfo>> iterable = (Iterable<Observable<ProductsInfo>>)lstPrd;
List<ProductsInfo> listofProd = Observable.zip(iterable, new FuncN<List<ProductsInfo>>(){
public List<ProductsInfo> call(Object...args) {
for (Object arg : args) { lstProductInfo.add((ProductsInfo)arg);}
return lstProductInfo;
}
}).toBlocking().single();
return lstProductInfo;
}
//Create and Return Observable
private Observable<ProductsInfo> getProductObservable(ServiceRequest serviceRequest, String productId ) {
return Observable.create((rx.Subscriber<? super ProductsInfo> s) -> {
ProductsInfo prdct = getProduct(serviceRequest, productId);
s.onNext(prdct);
s.onCompleted();
}).subscribeOn(Schedulers.io());
}
//Call End Service
private ProductsInfo getProduct(ServiceRequest serviceRequest, String productId) {
serviceRequest.setProductID(productId);
//service invocation
svcResponse = getProductEndPoint.getProducts(serviceRequest);//**********/
ProductsInfo product = serviceMapper.productMapper(svcResponse);
return product;
}
我想并行调用ProductComponent.getProducts(ServiceRequest serviceRequest, List<String> productIds)
以及上述2个服务。
不要使用create(OnSubscribe)
,背压和退订可能很难兑现。但是,当您使用create
时,您没有向订阅者s
发出任何项目,因此单个项目永远不会到达。
用fromCallable
:
create
Observable<BillInfoResponse> getBillInfoRsp(ServiceRequest serviceRequest) {
return Observable.fromCallable(() ->
billInfoComponent.getBillingDetails(serviceRequest));
}
然后继续进行zip
,就像你做的那样。如果可观察对象代表网络调用,你可能还想将.timeout
添加到网络调用的可观察对象中,以便在调用挂起时提前失败。
在并行运行网络调用可观察对象时,在单个可观察对象上使用.subscribeOn(Schedulers.io())
或.subscribeOn(Schedulers.from(ExecutorService))
。
例如,并行压缩a
、b
和c
:
Scheduler scheduler = Schedulers.io();
a
.zipWith(b.subscribeOn(scheduler))
.zipWith(c.subscribeOn(scheduler))
.subscribe(subscriber);