Reactive Extensions OnNext线程安全



使用Rx Subject,从多个线程调用OnNext()是否线程安全?

因此,序列可以从多个来源生成。

合并会做同样的事情吗?

Rx合约要求通知是连续的,并且对于几个操作员来说是逻辑上的必要条件。也就是说,您可以使用可用的Synchronize方法来获得这种行为。

var subject = new Subject<int>();
var syncedSubject = Subject.Synchronize(subject);            

现在,您可以对syncedSubject进行并发调用。对于必须同步的观察者,您也可以使用:

var observer = Observer.Create<Unit>(...);
var syncedObserver = Observer.Synchronize(observer);

测试:

Func<int, Action> onNext = i => () => syncedSubject.OnNext(i);
Parallel.Invoke
(
    onNext(1),
    onNext(2),
    onNext(3),
    onNext(4)
);

否,序列是连续的,因此不允许重叠通知。您可以使用Synchronize扩展方法来强制执行正确的同步。像Merge这样的运算符使用锁来调用下游观察器,以确保对on*回调进行正确的串行调用。

调用someSubject.OnNext()someList.Add()一样是线程安全的——您可以从>1个线程调用它,但不能同时调用。用lock语句包住你的OnNext,它就安全了。

相关内容

  • 没有找到相关文章

最新更新