使用rxjava生成无限序列的无限序列



我正在尝试使用rxjava编写一个简单的程序来生成无限的自然数序列。因此,到目前为止,我找到了两种方法,可以使用 observable.timer() and observable.interval()生成数字序列。我不确定这些功能是否是解决此问题的正确方法。我期望像我们在Java 8中具有的简单功能会产生无限的自然数。

intstream.iterate(1,value-> value 1).foreach(system.out :: println);

我尝试使用Intstream与可观察的,但这无法正常工作。它仅向第一个订户发送无限的数字流。如何正确生成无限的自然数序列?

import rx.Observable;
import rx.functions.Action1;
import java.util.stream.IntStream;
public class NaturalNumbers {
    public static void main(String[] args) {
        Observable<Integer> naturalNumbers = Observable.<Integer>create(subscriber -> {
            IntStream stream = IntStream.iterate(1, val -> val + 1);
            stream.forEach(naturalNumber -> subscriber.onNext(naturalNumber));
        });
        Action1<Integer> first = naturalNumber -> System.out.println("First got " + naturalNumber);
        Action1<Integer> second = naturalNumber -> System.out.println("Second got " + naturalNumber);
        Action1<Integer> third = naturalNumber -> System.out.println("Third got " + naturalNumber);
        naturalNumbers.subscribe(first);
        naturalNumbers.subscribe(second);
        naturalNumbers.subscribe(third);
    }
}

问题是,在naturalNumbers.subscribe(first);上,您实现的OnSubscribe被称为,并且您正在通过无限流进行forEach,因此为什么您的程序永远不会终止。

您可以处理它的一种方法是异步将它们在其他线程上订阅。为了轻松查看结果,我必须将睡眠引入流处理:

Observable<Integer> naturalNumbers = Observable.<Integer>create(subscriber -> {
    IntStream stream = IntStream.iterate(1, i -> i + 1);
    stream.peek(i -> {
        try {
            // Added to visibly see printing
            Thread.sleep(50);
        } catch (InterruptedException e) {
        }
    }).forEach(subscriber::onNext);
});
final Subscription subscribe1 = naturalNumbers
    .subscribeOn(Schedulers.newThread())
    .subscribe(first);
final Subscription subscribe2 = naturalNumbers
    .subscribeOn(Schedulers.newThread())
    .subscribe(second);
final Subscription subscribe3 = naturalNumbers
    .subscribeOn(Schedulers.newThread())
    .subscribe(third);
Thread.sleep(1000);
System.out.println("Unsubscribing");
subscribe1.unsubscribe();
subscribe2.unsubscribe();
subscribe3.unsubscribe();
Thread.sleep(1000);
System.out.println("Stopping");

Observable.Generate正是操作员反应地解决此类问题。我还假设这是一个教学示例,因为无论如何使用迭代可能会更好。

您的代码在订户的线程上产生整个流。由于它是无限流,因此subscribe调用将永远无法完成。除了这个明显的问题外,由于您没有在循环中检查它,因此取消订阅也将是有问题的。

您想使用调度程序来解决此问题 - 当然不要使用subscribeOn,因为那会给所有观察者带来负担。安排每个数字的交付到onNext-作为每个计划动作的最后一步,安排下一个。

本质上,这是Observable.generate给您的 - 每次迭代均在提供的调度程序上(默认为默认值为如果您不指定的话,则会引入并发)。可以取消调度程序操作并避免线程饥饿。

rx.net这样解决(实际上有一个更好的async/await模型,但在Java Afaik中不可用):

static IObservable<int> Range(int start, int count, IScheduler scheduler)
{
    return Observable.Create<int>(observer =>
    {
        return scheduler.Schedule(0, (i, self) =>
        {
            if (i < count)
            {
                Console.WriteLine("Iteration {0}", i);
                observer.OnNext(start + i);
                self(i + 1);
            }
            else
            {
                observer.OnCompleted();
            }
        });
   });
}

这里要注意的两件事:

  • 呼叫日程安排返回的订阅句柄,该订阅句柄将传递回观察者
  • 时间表是递归的 - self参数是用于调用下一个迭代的调度程序的引用。这允许取消订阅取消操作。

不确定在rxjava中的外观,但是这个想法应该相同。同样,Observable.generate对您来说可能会更简单,因为它旨在照顾这种情况。

创建无限序列时应注意:

  1. 在不同的线程上订阅并观察;否则,您只会服务单个订户
  2. 订阅终止后立即停止生成值;否则,失控的循环会吃掉您的CPU

第一个问题是通过使用subscribeOn()observeOn()和各种调度程序来解决的。

使用库提供的方法Observable.generate()Observable.fromIterable()最好解决第二个问题。他们确实进行了正确的检查。

检查以下内容:

Observable<Integer> naturalNumbers =
        Observable.<Integer, Integer>generate(() -> 1, (s, g) -> {
            logger.info("generating {}", s);
            g.onNext(s);
            return s + 1;
        }).subscribeOn(Schedulers.newThread());
Disposable sub1 = naturalNumbers
        .subscribe(v -> logger.info("1 got {}", v));
Disposable sub2 = naturalNumbers
        .subscribe(v -> logger.info("2 got {}", v));
Disposable sub3 = naturalNumbers
        .subscribe(v -> logger.info("3 got {}", v));
Thread.sleep(100);
logger.info("unsubscribing...");
sub1.dispose();
sub2.dispose();
sub3.dispose();
Thread.sleep(1000);
logger.info("done");

相关内容

  • 没有找到相关文章