BlockingCollection与作为消费者使用的Subject



我正在尝试在c#中实现一个消费者。有许多发布者可以同时执行。我创建了三个例子,一个带有Rx和主题,一个带有BlockingCollection,第三个使用来自BlockingCollection的tooobservable。在这个简单的例子中,它们都做同样的事情,我希望它们与多个制作人一起工作。

每种方法的不同品质是什么?

我已经在使用Rx了,所以我更喜欢这种方法。但我担心OnNext没有线程安全保证,我不知道Subject和默认调度器的队列语义是什么。

是否存在线程安全主题?

是否所有消息都将被处理?

是否有其他的场景,当这不起作用?是并发处理吗?

void SubjectOnDefaultScheduler()
{
    var observable = new Subject<long>();
    observable.
        ObserveOn(Scheduler.Default).
        Subscribe(i => { DoWork(i); });
    observable.OnNext(1);
    observable.OnNext(2);
    observable.OnNext(3);
}

不是Rx,但很容易适应使用/订阅它。它获取一个项目,然后处理它。这应该连续发生。

void BlockingCollectionAndConsumingTask()
{
    var blockingCollection = new BlockingCollection<long>();
    var taskFactory = new TaskFactory();
    taskFactory.StartNew(() =>
    {
        foreach (var i in blockingCollection.GetConsumingEnumerable())
        {
            DoWork(i);
        }
    });
    blockingCollection.Add(1);
    blockingCollection.Add(2);
    blockingCollection.Add(3);
}

使用阻塞集合有点像主题似乎是一个很好的折衷方案。我猜隐式将调度到任务上,这样我就可以使用async/await,是正确的吗?

void BlockingCollectionToObservable()
{
    var blockingCollection = new BlockingCollection<long>();
    blockingCollection.
        GetConsumingEnumerable().
        ToObservable(Scheduler.Default).
        Subscribe(i => { DoWork(i); });
    blockingCollection.Add(1);
    blockingCollection.Add(2);
    blockingCollection.Add(3);
}

主题不是线程安全的。并发发布的onnext将直接并发调用Observer。就我个人而言,考虑到Rx的其他领域执行正确语义的程度,我觉得这相当令人惊讶。我只能假设这样做是出于性能考虑。

主题是一种中途的房子,虽然,在它强制终止与OnError或OnComplete -在这些被提出后,OnNext是一个NOP。这种行为线程安全的。

但是在Subject上使用Observable.Synchronize(),它将强制发出调用遵守适当的Rx语义。特别是,OnNext调用将阻塞如果并发进行。

底层机制是标准的。net锁。当锁被多个线程争用时,大多数情况下,它们以先到先服务的方式获得锁。在某些情况下,公平是被违反的。但是,您肯定会得到您正在寻找的序列化访问。

ObserveOn具有特定于平台的行为-如果可用,您可以提供SynchronizationContext和OnNext调用被张贴到它。使用调度器,它最终会将调用放到ConcurrentQueue<T>上,并通过调度器对它们进行串行调度——因此执行线程将依赖于调度器。无论哪种方式,排队行为都将强制执行正确的语义。

在这两种情况下(Synchronize &观察),您当然不会丢失消息。使用ObserveOn,你可以通过选择Scheduler/Context来隐式地选择你将处理消息的线程,使用Synchronize你将在调用线程上处理消息。哪一个更好取决于你的场景。

还有更多需要考虑的——比如,如果生产商的发展速度超过了消费者,你该怎么办。

您可能也想看一下Rxx Consume: http://rxx.codeplex.com/SourceControl/changeset/view/63470#1100703

显示同步行为的示例代码(Nuget Rx-Testing, Nunit) -这对线程来说有点做作。睡眠代码,但它是相当繁琐的坏,我是懒惰的:):

public class SubjectTests
{
    [Test]
    public void SubjectDoesNotRespectGrammar()
    {
        var subject = new Subject<int>();
        var spy = new ObserverSpy(Scheduler.Default);
        var sut = subject.Subscribe(spy);
        // Swap the following with the preceding to make this test pass
        //var sut = subject.Synchronize().Subscribe(spy);
        Task.Factory.StartNew(() => subject.OnNext(1));
        Task.Factory.StartNew(() => subject.OnNext(2));
        Thread.Sleep(2000);
        Assert.IsFalse(spy.ConcurrencyViolation);
    }
    private class ObserverSpy : IObserver<int>
    {
        private int _inOnNext;
        public ObserverSpy(IScheduler scheduler)
        {
            _scheduler = scheduler;
        }
        public bool ConcurrencyViolation = false;
        private readonly IScheduler _scheduler;
        public void OnNext(int value)
        {
            var isInOnNext = Interlocked.CompareExchange(ref _inOnNext, 1, 0);
            if (isInOnNext == 1)
            {
                ConcurrencyViolation = true;
                return;
            }
            var wait = new ManualResetEvent(false);
            _scheduler.Schedule(TimeSpan.FromSeconds(1), () => wait.Set());
            wait.WaitOne();
            _inOnNext = 0;
        }
        public void OnError(Exception error)
        {
        }
        public void OnCompleted()
        {
        }
    }
}

相关内容

  • 没有找到相关文章