RxJs 中的 Observable.expand() 的 RxJava 等价物是什么?



在RxJs中,有一个名为Observable.expand的方法,它可以使用转换函数递归扩展序列。

例如

Rx.Observable.return(0).expand(function (x) { return Rx.Observable.return(x+1); }) 

将发出所有整数

但是,我在RxJava中找不到此方法。RxJava中还有其他方法可以实现类似的目标吗?

有关 exapand() 的更详细规范,请参阅 https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/expand.md

好吧,你给出的例子可以简化为:

Observable.range(0, Integer.MAX_VALUE)

但我假设你真的想做一些更复杂的事情。 scan与您正在寻找的内容并不相同,但它可以做类似的事情,我们可以使用它来制作一个Transformer,您可以重复使用类似的扩展。

scan 的主要区别在于,它每一步都需要一个新的输入值,但与 expand 一样,它保留了以前的值。我们可以通过忽略新的输入值来实现这一目标。因为扫描类似于扩展,所以我将从scan的例子开始,它有一些很大的缺陷,然后探索一个更好的选择。

public class Expand<T, T> implements Transformer<T, T> {
  private final Func1<T, T> expandFunc;
  public Expand(final Func1<T, T> expandFunc) {
    this.initialValue = initialValue;
    this.expandFunc = expandFunc;
  }
  @Override
  public Observable<T> call(Observable<T> source) {
    // Here we treat emissions from the source as a new 'initial value'.
    // NOTE: This will effectively only allow one input from the source, since the
    // output Observable expands infinitely. If you want it to switch to a new expanded
    // observable each time the source emits, use switchMap instead of concatMap.
    return source.concatMap(new Func1<T, Observable<T>>() {
      @Override
      public Observable<T> call(T initialValue) {
        // Make an infinite null Observable, just for our next-step signal.
        return Observable.<Void>just(null).repeat()
            .scan(initialValue, new Func2<T, Void, T>() {
              @Override
              public T call(final T currentValue, final Void unusedSignal) {
                return expandFunc.call(currentValue);
              }
            });
      }
    });
  }
}

为了使用该转换器,让我们创建一个方法,该方法采用当前数字,将 1 相加,然后对其进行平方。

Observable.just(1).compose(new Expand(new Func1<Integer, Integer>() {
  @Override
  public Integer call(final Integer input) {
    final Integer output = input + 1;
    return output * output;
  }
});

无论如何,您可能注意到了这种方法的一些主要尴尬点。首先,是 switch vs. concatMap 的事情,以及它本质上如何将一个项目从可观察输出转变为无限链。其次,整个"空洞"信号不应该是可观察的。当然,我们可以使用rangejust(1).repeat()或许多其他东西,但它们最终还是会被扔掉。

这是一种可以更干净、更递归地对其进行建模的方法。

public static <T> Observable<T> expandObservable(
    final T initialValue, final Func1<T, T> expandFunc) {
  return Observable.just(initialValue)
      .concatWith(Observable.defer(new Func0<Observable<T>>() {
        @Override
        public Observable<T> call() {
          return expandObservable(expandFunc.call(initialValue), expandFunc);
        }
      });
}

因此,在此示例中,每个递归传递输出当前值(在每个步骤上展开,并与下一步连接。 defer 用于防止无限递归立即发生,因为它在订阅之前不会调用代码来创建 Observable。使用它看起来像这样:

expandObservable(1, new Func1<Integer, Integer>() {
  @Override
  public Integer call(final Integer input) {
    final Integer output = input + 1;
    return output * output;
  }
}).subscribe(/** do whatever */);

因此,很像compose示例,但实现更整洁、更干净。

希望有帮助。

我想你正在寻找map

https://github.com/ReactiveX/RxJava/wiki/Transforming-Observables#map

numbers = Observable.from([1, 2, 3, 4, 5]);
numbers.map({it * it}).subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
1
4
9
16
25
Sequence complete

最新更新