RxJava:如何使用Observable替代嵌套循环



现在我使用带有ReplaySubject的嵌套循环来获得多个可观察性,这需要在不使用任何循环的情况下进行转换,只使用可观察器。

private Observable<BaseUnit> getUnitObservables(ArrayList<Map<Integer, Integer>> list, ReplaySubject<BaseUnit> subject) {
ArrayList<Observable<BaseUnit>> observables = new ArrayList<>();

for (Map<Integer, Integer> elem : list) {
for (MapOperationName operationName : MapOperationName.values()) { // MapOperationName type is enum
Observable<BaseUnit> observable = Observable.fromCallable(() -> {
// some operations here
subject.onNext(unit);
observables.add(observable);
});
}
}
return Observable.merge(observables);
}

"CCD_ 2";具有3个元素。"CCD_ 3";返回7个元素。所以我在上面的例子中得到的应该是21。

试图重构这样的东西,但只得到3个元素:

private Observable<BaseUnit> getUnitObservables(ArrayList<Map<Integer, Integer>> list, ReplaySubject<BaseUnit> subject) {
Observable<ListOperationName> observableOperation = 
Observable.fromArray(MapOperationName.values());
Observable<List<Integer>> observableList = Observable.fromIterable(list);
return Observable.zip(observableList, observableOperation, (listElem, operationElem) -> {
subject.onNext(unit);
//some operations here
});
}

如何正确组合这个ArrayListEnum,得到完整的配对组合?

您可以将需要执行的操作嵌套在flatMap中以实现这一点:

private Observable<BaseUnit> getUnitObservables(ArrayList<Map<Integer, Integer>> list, ReplaySubject<BaseUnit> subject) {
Observable<ListOperationName> observableOperation = 
Observable.fromArray(MapOperationName.values());
Observable<List<Integer>> observableList = Observable.fromIterable(list);
return observableList
.flatMap(listElem -> observableOperation
.map(operationElem -> {
// some operations here
subject.onNext(unit);
return unit;  // Assuming unit is a BaseUnit
})
);
}

我对将ReplaySubject传递给您的方法并不感兴趣。相反,您可以将您的方法作为源代码,并让您的subject订阅它:

private Observable<BaseUnit> getUnitObservables(ArrayList<Map<Integer, Integer>> list) {
return Observable.fromIterable(list)
.flatMap(listElem -> Observable.fromArray(MapOperationName.values())
.map(operationElem -> {
// some operations here
return new BaseUnit();
})
);
}
...
...
Subject<BaseUnit> subject = ReplaySubject.create();
subject
.mergeWith(getUnitObservables(listParam))
.subscribe();

最新更新