响应式扩展:订阅者内的并发性



我正试图将我的头脑围绕响应式扩展对并发性的支持,并且很难获得我所追求的结果。所以我可能还没有明白

我有一个源,它向流发送数据的速度比订阅者使用它的速度快。我更愿意配置流,以便使用另一个线程为流中的每个新项目调用订阅者,这样订阅者就有多个线程并发地通过它运行。我可以确保订阅者的线程安全。

下面的示例演示了这个问题:

Observable.Interval( TimeSpan.FromSeconds(1))
    .Do( x => Console.WriteLine("{0} Thread: {1} Source value: {2}",
                                DateTime.Now, 
                                Thread.CurrentThread.ManagedThreadId, x))
    .ObserveOn(NewThreadScheduler.Default)
    .Subscribe(x =>
               {
                   Console.WriteLine("{0} Thread: {1} Observed value: {2}",
                                     DateTime.Now,
                                     Thread.CurrentThread.ManagedThreadId, x);
                   Thread.Sleep(5000); // Simulate long work time
               });

控制台输出如下所示(删除日期):

4:25:20 PM Thread: 6 Source value: 0
4:25:20 PM Thread: 11 Observed value: 0
4:25:21 PM Thread: 12 Source value: 1
4:25:22 PM Thread: 12 Source value: 2
4:25:23 PM Thread: 6 Source value: 3
4:25:24 PM Thread: 6 Source value: 4
4:25:25 PM Thread: 11 Observed value: 1
4:25:25 PM Thread: 12 Source value: 5
4:25:26 PM Thread: 6 Source value: 6

请注意"观察值"时间delta。即使源继续发送数据的速度比订阅服务器处理数据的速度快,也不会并行调用订阅服务器。虽然我可以想象当前行为有用的许多场景,但我需要能够在消息可用时立即处理它们。

我已经尝试了几种调度器的变化与ObserveOn方法,但他们似乎都没有做我想要的。

除了在Subscribe操作中旋转一个线程来执行长时间运行的工作之外,我还缺少什么可以将数据并发交付给订阅者的东西吗?

提前感谢所有的答案和建议!

这里最根本的问题是,你希望Rx可观察对象以一种真正打破可观察对象工作规则的方式来分派事件。我认为看看这里的Rx设计指南会很有启发意义:http://go.microsoft.com/fwlink/?LinkID=205219——最值得注意的是,"4.2假设观察者实例是以序列化的方式调用的"。也就是说,你不意味着能够并行运行OnNext调用。事实上,Rx的排序行为是其设计哲学的核心。

如果你看源代码,你会看到Rx在ObserveOnObserver<T>派生的ScheduledObserver<T>类中强制这种行为…onnext从一个内部队列调度,每个onnext必须在下一个被调度之前完成——在给定的执行上下文中。Rx不允许单个用户的OnNext调用并发执行。

这并不是说你不能让多个订阅者以不同的速率执行。事实上,这很容易看到,如果你改变你的代码如下:

var source = Observable.Interval(TimeSpan.FromSeconds(1))
    .Do(x => Console.WriteLine("{0} Thread: {1} Source value: {2}",
                                DateTime.Now,
                                Thread.CurrentThread.ManagedThreadId, x))
    .ObserveOn(NewThreadScheduler.Default);
var subscription1 = source.Subscribe(x =>
    {
        Console.WriteLine("Subscriber 1: {0} Thread: {1} Observed value: {2}",
                            DateTime.Now,
                            Thread.CurrentThread.ManagedThreadId, x);
        Thread.Sleep(1000); // Simulate long work time
    });
var subscription2 = source.Subscribe(x =>
{
    Console.WriteLine("Subscriber 2: {0} Thread: {1} Observed value: {2}",
                        DateTime.Now,
                        Thread.CurrentThread.ManagedThreadId, x);
    Thread.Sleep(5000); // Simulate long work time
});

现在您将看到订阅用户1领先于订阅用户2。

你不能轻易做到的是要求一个可观察对象做一些事情,比如将OnNext调用分派给一个"就绪"的订阅者——这是你以一种迂回的方式要求的。我也假设你不会真的想在缓慢的消费情况下为每个OnNext创建一个新线程!

在这种情况下,听起来您可能更适合使用单个订阅者,它除了尽可能快地将工作推送到队列之外什么都不做,然后由许多消费工作线程提供服务,然后您可以根据需要控制以保持同步。

相关内容

  • 没有找到相关文章