rx-java-RxJava:窗口/缓冲区过载问题,与rx.NET不兼容



我正在阅读http://www.introtorx.com/content/v1.0.10621.0/17_SequencesOfCoincidence.html#SequencesOfCoincidence并在那里找到了关于CCD_ 1算子的以下部分:

"这些复杂重载中的第一个允许我们控制窗口何时关闭。每次创建窗口时都会调用windowClosingSelector函数。窗口是在订阅时创建的,并且在窗口关闭后立即创建;当windowClosingElector的序列产生值时,窗口就会关闭。该值被忽略,因此无论什么类型的seqence值为;事实上您只需从windowClosingSelector完成序列即可关闭窗口。"

这似乎与public final <TClosing> Observable<Observable<T>> window(Func0<? extends Observable<? extends TClosing>> closingSelector)方法的文档和行为不同。我的测试表明,生成关闭可观察对象的Func0只调用过一次,如上所述完成这项可观察的工作也不调用过——只创建过一个窗口。该方法的工作原理实际上与public final <U> Observable<Observable<T>> window(Observable<U> boundary)方法相同,只是边界是直接传递的,而不是通过函数传递的。从源代码来看,后一种/更简单的方法只是通过将传递的边界封装到Func0中来调用前一种/较复杂的方法,Func0只返回它

问题:

  1. RxJava的public final <TClosing> Observable<Observable<T>> window(Func0<? extends Observable<? extends TClosing>> closingSelector)的行为似乎是错误的,除非这本书是错误的或者RxJava兼容的Rx.NET的新版本发生了变化。这是Rx.NET和RxJava之间故意的不兼容吗?

  2. 如果不兼容是正确的行为,为什么RxJava有两个window重载,它们实际上做得差不多?如果一个函数不需要任何参数并且只被调用一次,那么我看不出用一个创建窗口关闭可观察的函数来重载有任何明显的好处。好的,它使得可观察到的懒惰的创建,但我认为这可以通过defer方法来实现。

  3. 更令人困惑的是,两个过载的大理石图略有不同(箭头是)。也许我只是不明白其中的真正区别?

对于buffer方法,存在相同的两个重载。

以下是一些代码,或多或少是上述文章中的一个端口。每一行输入都会关闭当前窗口并打开一个新窗口,除非它是"q"(不区分大小写),在这种情况下,整个过程就会结束。

Observable<Long> source = Observable.interval(1L, TimeUnit.SECONDS);
PublishSubject<Object> closer = PublishSubject.create();
Subscription s = source
        .window(() -> {
            System.out.println("!!! creating window closer");
            return closer;
        })
        .subscribe(new Subscriber<Observable<Long>>() {
            private int idx = 0;
            @Override
            public void onNext(Observable<Long> window) {
                ++idx;
                System.out.printf("+++ starting new window%n");
                String windowName = "window " + idx;
                window.subscribe(new Subscriber<Long>() {
                    @Override
                    public void onNext(Long aLong) {
                        System.out.printf("%s -> %d%n", windowName, aLong);
                    }
                    @Override
                    public void onError(Throwable e) {
                        // nothing
                    }
                    @Override
                    public void onCompleted() {
                        System.out.printf("--- %s completed%n", windowName);
                    }
                });
            }
            @Override
            public void onError(Throwable e) {
                // nothing
            }
            @Override
            public void onCompleted() {
                System.out.println("completed");
            }
        });
Scanner scanner = new Scanner(System.in);
String input;
do {
    input = scanner.nextLine();
    closer.onNext(input);
    //closer.onCompleted();
} while (!"q".equalsIgnoreCase(input));
s.unsubscribe();

window0替换closer.onNext()行会中断应用程序—只创建一个窗口。

根据建议,我在Github页面上提出了这个问题,并且已经解决了:https://github.com/ReactiveX/RxJava/issues/3053.

最新更新