我正在尝试使用rxjava编写一个简单的程序来生成无限的自然数序列。因此,到目前为止,我找到了两种方法,可以使用 observable.timer() and observable.interval()生成数字序列。我不确定这些功能是否是解决此问题的正确方法。我期望像我们在Java 8中具有的简单功能会产生无限的自然数。
intstream.iterate(1,value-> value 1).foreach(system.out :: println);
我尝试使用Intstream与可观察的,但这无法正常工作。它仅向第一个订户发送无限的数字流。如何正确生成无限的自然数序列?
import rx.Observable;
import rx.functions.Action1;
import java.util.stream.IntStream;
public class NaturalNumbers {
public static void main(String[] args) {
Observable<Integer> naturalNumbers = Observable.<Integer>create(subscriber -> {
IntStream stream = IntStream.iterate(1, val -> val + 1);
stream.forEach(naturalNumber -> subscriber.onNext(naturalNumber));
});
Action1<Integer> first = naturalNumber -> System.out.println("First got " + naturalNumber);
Action1<Integer> second = naturalNumber -> System.out.println("Second got " + naturalNumber);
Action1<Integer> third = naturalNumber -> System.out.println("Third got " + naturalNumber);
naturalNumbers.subscribe(first);
naturalNumbers.subscribe(second);
naturalNumbers.subscribe(third);
}
}
问题是,在naturalNumbers.subscribe(first);
上,您实现的OnSubscribe
被称为,并且您正在通过无限流进行forEach
,因此为什么您的程序永远不会终止。
您可以处理它的一种方法是异步将它们在其他线程上订阅。为了轻松查看结果,我必须将睡眠引入流处理:
Observable<Integer> naturalNumbers = Observable.<Integer>create(subscriber -> {
IntStream stream = IntStream.iterate(1, i -> i + 1);
stream.peek(i -> {
try {
// Added to visibly see printing
Thread.sleep(50);
} catch (InterruptedException e) {
}
}).forEach(subscriber::onNext);
});
final Subscription subscribe1 = naturalNumbers
.subscribeOn(Schedulers.newThread())
.subscribe(first);
final Subscription subscribe2 = naturalNumbers
.subscribeOn(Schedulers.newThread())
.subscribe(second);
final Subscription subscribe3 = naturalNumbers
.subscribeOn(Schedulers.newThread())
.subscribe(third);
Thread.sleep(1000);
System.out.println("Unsubscribing");
subscribe1.unsubscribe();
subscribe2.unsubscribe();
subscribe3.unsubscribe();
Thread.sleep(1000);
System.out.println("Stopping");
Observable.Generate
正是操作员反应地解决此类问题。我还假设这是一个教学示例,因为无论如何使用迭代可能会更好。
您的代码在订户的线程上产生整个流。由于它是无限流,因此subscribe
调用将永远无法完成。除了这个明显的问题外,由于您没有在循环中检查它,因此取消订阅也将是有问题的。
您想使用调度程序来解决此问题 - 当然不要使用subscribeOn
,因为那会给所有观察者带来负担。安排每个数字的交付到onNext
-作为每个计划动作的最后一步,安排下一个。
本质上,这是Observable.generate
给您的 - 每次迭代均在提供的调度程序上(默认为默认值为如果您不指定的话,则会引入并发)。可以取消调度程序操作并避免线程饥饿。
rx.net这样解决(实际上有一个更好的async/await
模型,但在Java Afaik中不可用):
static IObservable<int> Range(int start, int count, IScheduler scheduler)
{
return Observable.Create<int>(observer =>
{
return scheduler.Schedule(0, (i, self) =>
{
if (i < count)
{
Console.WriteLine("Iteration {0}", i);
observer.OnNext(start + i);
self(i + 1);
}
else
{
observer.OnCompleted();
}
});
});
}
这里要注意的两件事:
- 呼叫日程安排返回的订阅句柄,该订阅句柄将传递回观察者
- 时间表是递归的 -
self
参数是用于调用下一个迭代的调度程序的引用。这允许取消订阅取消操作。
不确定在rxjava中的外观,但是这个想法应该相同。同样,Observable.generate
对您来说可能会更简单,因为它旨在照顾这种情况。
创建无限序列时应注意:
- 在不同的线程上订阅并观察;否则,您只会服务单个订户
- 订阅终止后立即停止生成值;否则,失控的循环会吃掉您的CPU
第一个问题是通过使用subscribeOn()
,observeOn()
和各种调度程序来解决的。
使用库提供的方法Observable.generate()
或Observable.fromIterable()
最好解决第二个问题。他们确实进行了正确的检查。
检查以下内容:
Observable<Integer> naturalNumbers =
Observable.<Integer, Integer>generate(() -> 1, (s, g) -> {
logger.info("generating {}", s);
g.onNext(s);
return s + 1;
}).subscribeOn(Schedulers.newThread());
Disposable sub1 = naturalNumbers
.subscribe(v -> logger.info("1 got {}", v));
Disposable sub2 = naturalNumbers
.subscribe(v -> logger.info("2 got {}", v));
Disposable sub3 = naturalNumbers
.subscribe(v -> logger.info("3 got {}", v));
Thread.sleep(100);
logger.info("unsubscribing...");
sub1.dispose();
sub2.dispose();
sub3.dispose();
Thread.sleep(1000);
logger.info("done");