我正试图将我的头脑围绕响应式扩展对并发性的支持,并且很难获得我所追求的结果。所以我可能还没有明白。
我有一个源,它向流发送数据的速度比订阅者使用它的速度快。我更愿意配置流,以便使用另一个线程为流中的每个新项目调用订阅者,这样订阅者就有多个线程并发地通过它运行。我可以确保订阅者的线程安全。
下面的示例演示了这个问题:
Observable.Interval( TimeSpan.FromSeconds(1))
.Do( x => Console.WriteLine("{0} Thread: {1} Source value: {2}",
DateTime.Now,
Thread.CurrentThread.ManagedThreadId, x))
.ObserveOn(NewThreadScheduler.Default)
.Subscribe(x =>
{
Console.WriteLine("{0} Thread: {1} Observed value: {2}",
DateTime.Now,
Thread.CurrentThread.ManagedThreadId, x);
Thread.Sleep(5000); // Simulate long work time
});
控制台输出如下所示(删除日期):
4:25:20 PM Thread: 6 Source value: 0
4:25:20 PM Thread: 11 Observed value: 0
4:25:21 PM Thread: 12 Source value: 1
4:25:22 PM Thread: 12 Source value: 2
4:25:23 PM Thread: 6 Source value: 3
4:25:24 PM Thread: 6 Source value: 4
4:25:25 PM Thread: 11 Observed value: 1
4:25:25 PM Thread: 12 Source value: 5
4:25:26 PM Thread: 6 Source value: 6
请注意"观察值"时间delta。即使源继续发送数据的速度比订阅服务器处理数据的速度快,也不会并行调用订阅服务器。虽然我可以想象当前行为有用的许多场景,但我需要能够在消息可用时立即处理它们。
我已经尝试了几种调度器的变化与ObserveOn方法,但他们似乎都没有做我想要的。
除了在Subscribe操作中旋转一个线程来执行长时间运行的工作之外,我还缺少什么可以将数据并发交付给订阅者的东西吗?
提前感谢所有的答案和建议!
这里最根本的问题是,你希望Rx可观察对象以一种真正打破可观察对象工作规则的方式来分派事件。我认为看看这里的Rx设计指南会很有启发意义:http://go.microsoft.com/fwlink/?LinkID=205219——最值得注意的是,"4.2假设观察者实例是以序列化的方式调用的"。也就是说,你不意味着能够并行运行OnNext调用。事实上,Rx的排序行为是其设计哲学的核心。
如果你看源代码,你会看到Rx在ObserveOnObserver<T>
派生的ScheduledObserver<T>
类中强制这种行为…onnext从一个内部队列调度,每个onnext必须在下一个被调度之前完成——在给定的执行上下文中。Rx不允许单个用户的OnNext调用并发执行。
这并不是说你不能让多个订阅者以不同的速率执行。事实上,这很容易看到,如果你改变你的代码如下:
var source = Observable.Interval(TimeSpan.FromSeconds(1))
.Do(x => Console.WriteLine("{0} Thread: {1} Source value: {2}",
DateTime.Now,
Thread.CurrentThread.ManagedThreadId, x))
.ObserveOn(NewThreadScheduler.Default);
var subscription1 = source.Subscribe(x =>
{
Console.WriteLine("Subscriber 1: {0} Thread: {1} Observed value: {2}",
DateTime.Now,
Thread.CurrentThread.ManagedThreadId, x);
Thread.Sleep(1000); // Simulate long work time
});
var subscription2 = source.Subscribe(x =>
{
Console.WriteLine("Subscriber 2: {0} Thread: {1} Observed value: {2}",
DateTime.Now,
Thread.CurrentThread.ManagedThreadId, x);
Thread.Sleep(5000); // Simulate long work time
});
现在您将看到订阅用户1领先于订阅用户2。
你不能轻易做到的是要求一个可观察对象做一些事情,比如将OnNext调用分派给一个"就绪"的订阅者——这是你以一种迂回的方式要求的。我也假设你不会真的想在缓慢的消费情况下为每个OnNext创建一个新线程!
在这种情况下,听起来您可能更适合使用单个订阅者,它除了尽可能快地将工作推送到队列之外什么都不做,然后由许多消费工作线程提供服务,然后您可以根据需要控制以保持同步。