我在RxJava中创建了一个Subject
实例,并从多个线程调用它的onNext()
:
PublishSubject<String> subject = PublishSubject.create();
//...
subject.onNext("A"); //thread A
subject.onNext("B"); //thread B
RxJava文档说:
注意不要从多个线程调用它的
onNext( )
方法(或它的其他on方法),因为这可能导致非序列化调用,这违反了可观察对象契约,并在结果Subject
中产生歧义。
- 我是否必须在这样的
Subject
上调用toSerialized()
假设我不在乎"A"
是在"B"
之前还是之后?序列化有什么帮助? - 是
Subject
线程安全的方式还是我会打破RxJava没有toSerialized()
? - 文档中提到的"Observable contract"是什么?
我是否必须在这样的主题上调用toSerialized()假设我不在乎是否"A"在"& & "之前还是之后?
是的,使用toSerialized()
,因为所有应用于对象的操作符都假设正确的序列化发生在上游。如果不这样做,流可能会失败或产生意想不到的结果。
是主题线程安全的方式还是我会打破RxJava没有toSerialized()?
回答上面
什么是"可观察契约"?文档中提到的?
Rx Design Guidelines.pdf section 4定义了Observable契约:
4.2。假设以序列化方式调用观察者实例
由于Rx使用推送模型,而。net支持多线程,因此不同的消息可能同时到达不同的执行上下文。如果可观察序列的消费者必须在每个地方处理这个问题,那么他们的代码将需要执行大量的内务处理,以避免常见的并发问题。以这种方式编写的代码将更难维护,并可能遭受性能问题。
我认为RxJava文档应该使这更容易发现,所以我将提出一个问题。
根据Dave的回答,如果您事先知道您的主题将从不同的线程访问,您可以将其包装成SerializedSubject
http://reactivex.io/RxJava/javadoc/rx/subjects/SerializedSubject.html
包装一个Subject,这样可以安全地从不同的线程调用它的各种on方法。
:private final Subject<Object, Object> bus = new SerializedSubject<Object, Object>(PublishSubject.create());
(摘自Ben Christensen的EventBus示例:http://bl.ocks.org/benjchristensen/04eef9ca0851f3a5d7bf)