我有以下代码:
private static void log(Object msg) {
System.out.println(
Thread.currentThread().getName() +
": " + msg);
}
Observable<Integer> naturalNumbers = Observable.create(emitter -> {
log("Invoked"); // on main thread
Runnable r = () -> {
log("Invoked on another thread");
int i = 0;
while(!emitter.isDisposed()) {
log("Emitting "+ i);
emitter.onNext(i);
i += 1;
}
};
new Thread(r).start();
});
Disposable disposable = naturalNumbers.subscribe(i -> log("Received "+i));
这里有两个重要的lambda表达式。第一个是我们传递给Observable.create的,第二个是我们传给Observable.subscribe((的回调。在第一个lambda中,我们创建一个新线程,然后在该线程上发出值。在第二个lambda中,我们有代码来接收第一个lambda代码中发出的值。我观察到这两个代码都是在同一个线程上执行的。
Thread-0: Invoked on another thread
Thread-0: Emitting 0
Thread-0: Received 0
Thread-0: Emitting 1
Thread-0: Received 1
Thread-0: Emitting 2
Thread-0: Received 2
为什么会这样?RxJava是否默认在同一线程上运行代码发射值(observable(和代码接收值(observer(?
让我们看看,如果使用Thread
执行可运行的:,会发生什么
测试
@Test
void threadTest() throws Exception {
log("main");
CountDownLatch countDownLatch = new CountDownLatch(1);
new Thread(
() -> {
log("thread");
countDownLatch.countDown();
})
.start();
countDownLatch.await();
}
输出
main: main
Thread-0: thread
看起来,主入口点是从main
线程调用的,而新创建的Thread
被称为Thread-0
。
为什么会这样?RxJava是否默认在同一线程上运行代码发射值(observable(和代码接收值(observer(?
默认情况下,RxJava
是单线程的。因此,如果observeOn
、subscribeOn
或不同的线程布局没有对生产者进行不同的定义,生产者将在consumer
(subscriber(线程上发出值。这是因为默认情况下RxJava
运行订阅堆栈上的所有内容。
示例2
@Test
void fdskfkjsj() throws Exception {
log("main");
Observable<Integer> naturalNumbers =
Observable.create(
emitter -> {
log("Invoked"); // on main thread
Runnable r =
() -> {
log("Invoked on another thread");
int i = 0;
while (!emitter.isDisposed()) {
log("Emitting " + i);
emitter.onNext(i);
i += 1;
}
};
new Thread(r).start();
});
Disposable disposable = naturalNumbers.subscribe(i -> log("Received " + i));
Thread.sleep(100);
}
输出2
main: main
main: Invoked
Thread-0: Invoked on another thread
Thread-0: Emitting 0
Thread-0: Received 0
Thread-0: Emitting 1
在您的示例中,很明显,main方法是从主线程调用的。此外,subscribeActual
调用也在调用线程(main
(上运行。但是Observable#create
lambda从新创建的线程Thread-0
调用onNext
。该值是从调用线程推送到订阅者的。在这种情况下,调用线程是Thread-0
,因为它在下游用户上调用onNext
。
如何区分生产者和消费者
使用observeOn
/subscribeOn
运算符来处理RxJava
中的并发。
我应该使用低级线程构造吗ẁ使用RxJava
不,您不应该使用new Thread
来区分生产者和消费者。onNext
不能同时调用(交错(,因此很容易破坏合同。这就是为什么RxJava
提供了一种称为Scheduler
的具有Worker
s的构造以减轻这种错误。
注:我认为这篇文章描述得很好:http://introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html。请注意,这是Rx.NET,但原理完全相同。如果你想了解RxJava
的并发性,你也可以看看Davids博客(https://akarnokd.blogspot.com/2015/05/schedulers-part-1.html)或阅读本书(RxJava的反应式编程https://www.oreilly.com/library/view/reactive-programming-with/9781491931646/)