使用 ReactiveExtensions 并行处理集合的值



我是Rx的新手。根据文档,看起来可以在不同的线程上为每个值并行运行 OnNext 处理程序。我只需要观察适当的调度程序实现。

但是这个简单的代码是同步运行的:

var dict = new Dictionary<int, object> {{1, null}, {5, null}, {6, null}};
dict.ToObservable()
  .SubscribeOn(ThreadPoolScheduler.Default)
  .Subscribe(kv => {
     Console.WriteLine("Thread {0}, key {1}", Thread.CurrentThread.ManagedThreadId, kv.Key);
     Thread.Sleep(1000);
  });

在屏幕上,我看到每次迭代都有相同的线程 ID。我在这里错过了什么?

Rx 定义了一种语法,该语法明确禁止 OnNext 从任何给定观察者的角度重叠。可以并行调用流的多个订阅者(但这取决于 Rx 运算符的实现者)。

在这里,我们有两个订阅者以来自同一流的不同速率处理 OnNext:

void Main()
{
    var stream = Observable.Interval(TimeSpan.FromSeconds(1));    
    var sub1 = stream.Subscribe(x => {
        Console.WriteLine("Sub1 handler start: " + Thread.CurrentThread.ManagedThreadId);
        Thread.Sleep(4000);
        Console.WriteLine("Sub1 handler end");
    });
    var sub2 = stream.Subscribe(x => {
        Console.WriteLine("Sub2 handler start: " + Thread.CurrentThread.ManagedThreadId);
        Thread.Sleep(2000);
        Console.WriteLine("Sub2 handler end");
    });
    Console.ReadLine();
}

这是输出,看看 Sub2 如何领先于 Sub1,并且每个都在自己的线程上。

Sub2 handler start: 18
Sub1 handler start: 12
Sub2 handler end
Sub2 handler start: 18
Sub2 handler end
Sub2 handler start: 18
Sub1 handler end
Sub1 handler start: 12
Sub2 handler end
Sub2 handler start: 18
Sub2 handler end
Sub1 handler end
Sub2 handler start: 18
Sub1 handler start: 12

请注意,没有什么可说的,每个订阅都会获得自己的线程 - 这取决于调度程序和运算符的实现方式。只要它们符合 OnNext* (OnError |完成)任何事情都会发生。

对于您的特定情况,我会研究 PLINQ/TPL - 感觉它比 Rx 更适合。

顺便说一句,如果你刚刚起步,李坎贝尔的 www.introtorx.com 是一个很好的资源。

相关内容

  • 没有找到相关文章

最新更新