RxJava SerializedObserver implementation



在阅读David Karnok关于RxJava内部结构的文章时,我遇到了一个类似于RxJava的SerializedObserver类的实现示例。下面是代码:

class ValueListEmitterLoop<T> {
    List<T> queue;                           
    boolean emitting;
    Consumer<? super T> consumer;
    public void emit(T value) {
        synchronized (this) {
            if (emitting) {
                List<T> q = queue;
                if (q == null) {
                    q = new ArrayList<>();   
                    queue = q;
                }
                q.add(value);
                return;
            }
            emitting = true;
        }
        consumer.accept(value);              
        for (;;) {
             List<T> q;
             synchronized (this) {           
                 q = queue;
                 if (q == null) {            
                     emitting = false;
                     return;
                 }
                 queue = null;               
             }
             q.forEach(consumer);            
        }        
    }
}

那么问题是为什么在第一个synchronized块中引入内部变量q ?我清楚地看到了第二个synchronized块背后的原因。有没有什么原因让我错过了不只是使用:

if (queue == null) {
    queue = new ArrayList<>();
}
queue.add(value);

我发现将字段读入局部变量是一个很好的做法,特别是如果它们被多次使用并且附近有一些易失性/同步访问。

例如,下面是一个常见的模式:

volatile boolean cancelled;
final Queue<T> queue;
final Subscriber<? super T> actual;
void drain() {
    Subscriber<? super T> a = actual;
    Queue<T> q = queue;
    for (;;) {
        if (cancelled) {
            return;
        }
        T v = q.poll();
        if (v == null) {
             a.onComplete();
             return;
        }
        a.onNext(v);
    }
}

如果aq是字段访问,处理器/JVM将不得不一直从内存中读取它们,因为cancelled的易失性访问和poll()中的类似原子。

相关内容

  • 没有找到相关文章

最新更新