在RxJs中,有一个名为Observable.expand的方法,它可以使用转换函数递归扩展序列。
例如
Rx.Observable.return(0).expand(function (x) { return Rx.Observable.return(x+1); })
将发出所有整数
但是,我在RxJava中找不到此方法。RxJava中还有其他方法可以实现类似的目标吗?
有关 exapand() 的更详细规范,请参阅 https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/expand.md
好吧,你给出的例子可以简化为:
Observable.range(0, Integer.MAX_VALUE)
但我假设你真的想做一些更复杂的事情。 scan
与您正在寻找的内容并不相同,但它可以做类似的事情,我们可以使用它来制作一个Transformer
,您可以重复使用类似的扩展。
与 scan
的主要区别在于,它每一步都需要一个新的输入值,但与 expand 一样,它保留了以前的值。我们可以通过忽略新的输入值来实现这一目标。因为扫描类似于扩展,所以我将从scan
的例子开始,它有一些很大的缺陷,然后探索一个更好的选择。
public class Expand<T, T> implements Transformer<T, T> {
private final Func1<T, T> expandFunc;
public Expand(final Func1<T, T> expandFunc) {
this.initialValue = initialValue;
this.expandFunc = expandFunc;
}
@Override
public Observable<T> call(Observable<T> source) {
// Here we treat emissions from the source as a new 'initial value'.
// NOTE: This will effectively only allow one input from the source, since the
// output Observable expands infinitely. If you want it to switch to a new expanded
// observable each time the source emits, use switchMap instead of concatMap.
return source.concatMap(new Func1<T, Observable<T>>() {
@Override
public Observable<T> call(T initialValue) {
// Make an infinite null Observable, just for our next-step signal.
return Observable.<Void>just(null).repeat()
.scan(initialValue, new Func2<T, Void, T>() {
@Override
public T call(final T currentValue, final Void unusedSignal) {
return expandFunc.call(currentValue);
}
});
}
});
}
}
为了使用该转换器,让我们创建一个方法,该方法采用当前数字,将 1 相加,然后对其进行平方。
Observable.just(1).compose(new Expand(new Func1<Integer, Integer>() {
@Override
public Integer call(final Integer input) {
final Integer output = input + 1;
return output * output;
}
});
无论如何,您可能注意到了这种方法的一些主要尴尬点。首先,是 switch vs. concatMap 的事情,以及它本质上如何将一个项目从可观察输出转变为无限链。其次,整个"空洞"信号不应该是可观察的。当然,我们可以使用range
或just(1).repeat()
或许多其他东西,但它们最终还是会被扔掉。
这是一种可以更干净、更递归地对其进行建模的方法。
public static <T> Observable<T> expandObservable(
final T initialValue, final Func1<T, T> expandFunc) {
return Observable.just(initialValue)
.concatWith(Observable.defer(new Func0<Observable<T>>() {
@Override
public Observable<T> call() {
return expandObservable(expandFunc.call(initialValue), expandFunc);
}
});
}
因此,在此示例中,每个递归传递输出当前值(在每个步骤上展开,并与下一步连接。 defer
用于防止无限递归立即发生,因为它在订阅之前不会调用代码来创建 Observable。使用它看起来像这样:
expandObservable(1, new Func1<Integer, Integer>() {
@Override
public Integer call(final Integer input) {
final Integer output = input + 1;
return output * output;
}
}).subscribe(/** do whatever */);
因此,很像compose
示例,但实现更整洁、更干净。
希望有帮助。
我想你正在寻找map
:
https://github.com/ReactiveX/RxJava/wiki/Transforming-Observables#map
numbers = Observable.from([1, 2, 3, 4, 5]);
numbers.map({it * it}).subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
1
4
9
16
25
Sequence complete