RxJava中发出值的代码和接收值的代码的线程执行



我有以下代码:

private static void log(Object msg) {
System.out.println(
Thread.currentThread().getName() +
": " + msg);
}
Observable<Integer> naturalNumbers = Observable.create(emitter -> {
log("Invoked"); // on main thread
Runnable r = () -> {
log("Invoked on another thread");
int i = 0;
while(!emitter.isDisposed()) {
log("Emitting "+ i);
emitter.onNext(i);
i += 1;
}
};
new Thread(r).start();
});
Disposable disposable = naturalNumbers.subscribe(i -> log("Received "+i));

这里有两个重要的lambda表达式。第一个是我们传递给Observable.create的,第二个是我们传给Observable.subscribe((的回调。在第一个lambda中,我们创建一个新线程,然后在该线程上发出值。在第二个lambda中,我们有代码来接收第一个lambda代码中发出的值。我观察到这两个代码都是在同一个线程上执行的。

Thread-0: Invoked on another thread
Thread-0: Emitting 0
Thread-0: Received 0
Thread-0: Emitting 1
Thread-0: Received 1
Thread-0: Emitting 2
Thread-0: Received 2

为什么会这样?RxJava是否默认在同一线程上运行代码发射值(observable(和代码接收值(observer(?

让我们看看,如果使用Thread执行可运行的:,会发生什么

测试

@Test
void threadTest() throws Exception {
log("main");
CountDownLatch countDownLatch = new CountDownLatch(1);
new Thread(
() -> {
log("thread");
countDownLatch.countDown();
})
.start();
countDownLatch.await();
}

输出

main: main
Thread-0: thread

看起来,主入口点是从main线程调用的,而新创建的Thread被称为Thread-0

为什么会这样?RxJava是否默认在同一线程上运行代码发射值(observable(和代码接收值(observer(?

默认情况下,RxJava是单线程的。因此,如果observeOnsubscribeOn或不同的线程布局没有对生产者进行不同的定义,生产者将在consumer(subscriber(线程上发出值。这是因为默认情况下RxJava运行订阅堆栈上的所有内容。

示例2

@Test
void fdskfkjsj() throws Exception {
log("main");
Observable<Integer> naturalNumbers =
Observable.create(
emitter -> {
log("Invoked"); // on main thread
Runnable r =
() -> {
log("Invoked on another thread");
int i = 0;
while (!emitter.isDisposed()) {
log("Emitting " + i);
emitter.onNext(i);
i += 1;
}
};
new Thread(r).start();
});
Disposable disposable = naturalNumbers.subscribe(i -> log("Received " + i));
Thread.sleep(100);
}

输出2

main: main
main: Invoked
Thread-0: Invoked on another thread
Thread-0: Emitting 0
Thread-0: Received 0
Thread-0: Emitting 1

在您的示例中,很明显,main方法是从主线程调用的。此外,subscribeActual调用也在调用线程(main(上运行。但是Observable#createlambda从新创建的线程Thread-0调用onNext。该值是从调用线程推送到订阅者的。在这种情况下,调用线程是Thread-0,因为它在下游用户上调用onNext

如何区分生产者和消费者

使用observeOn/subscribeOn运算符来处理RxJava中的并发。

我应该使用低级线程构造吗ẁ使用RxJava

不,您不应该使用new Thread来区分生产者和消费者。onNext不能同时调用(交错(,因此很容易破坏合同。这就是为什么RxJava提供了一种称为Scheduler的具有Workers的构造以减轻这种错误。

注:我认为这篇文章描述得很好:http://introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html。请注意,这是Rx.NET,但原理完全相同。如果你想了解RxJava的并发性,你也可以看看Davids博客(https://akarnokd.blogspot.com/2015/05/schedulers-part-1.html)或阅读本书(RxJava的反应式编程https://www.oreilly.com/library/view/reactive-programming-with/9781491931646/)

相关内容

  • 没有找到相关文章

最新更新