NewThreadScheduler.默认安排所有工作在同一个线程上



我目前正试图了解RX.NET的并发性,但却被一些事情弄糊涂了。我想并行运行四个相对较慢的任务,所以我认为NewThreadScheduler.Default是可行的,因为它"表示一个在单独线程上调度每个工作单元的对象。".

这是我的设置代码:

    static void Test()
    {
        Console.WriteLine("Starting. Thread {0}", Thread.CurrentThread.ManagedThreadId);
        var query = Enumerable.Range(1, 4);
        var obsQuery = query.ToObservable(NewThreadScheduler.Default);
        obsQuery.Subscribe(DoWork, Done);
        Console.WriteLine("Last line. Thread {0}", Thread.CurrentThread.ManagedThreadId);
    }
    static void DoWork(int i)
    {
        Thread.Sleep(500);
        Console.WriteLine("{0} Thread {1}", i, Thread.CurrentThread.ManagedThreadId);
    }
    static void Done()
    {
        Console.WriteLine("Done. Thread {0}", Thread.CurrentThread.ManagedThreadId);
    }

我假设"X线程Y"每次都会输出不同的线程id,但实际输出是:

Starting. Thread 1
Last line. Thread 1
1 Thread 3
2 Thread 3
3 Thread 3
4 Thread 3
Done. Thread 3

所有的工作都是按顺序在同一个新线程上进行的,这不是我所期望的。

我想我错过了什么,但我不知道是什么。

可观察查询有两个部分,Query本身和Subscription。(这也是ObserveOn和SubscribeOn运算符之间的区别。)

您的Query

Enumerable
    .Range(1, 4)
    .ToObservable(NewThreadScheduler.Default);

这将创建一个可观察的对象,在该系统的默认NewThreadScheduler上生成值

您的订阅是

obsQuery.Subscribe(DoWork, Done);

QueryOnComplete调用结束时,这将为QueryDone产生的每个值运行DoWork。在实践中,如果查询的所有值都是在同一个线程上生成的,那么订阅方法中的函数将在哪个线程上调用,我认为没有任何保证。他们似乎也在这样做,所以所有的订阅调用都在同一个线程上进行,这很可能是为了消除许多常见的多线程错误。

所以您有两个问题,一个是您的日志记录,如果您将Query更改为

Enumerable
    .Range(1, 4)
    .Do(x => Console.WriteLine("Query Value {0} produced on Thread {1}", x, Thread.CurrentThread.ManagedThreadId);
    .ToObservable(NewThreadScheduler.Default);

您将看到在一个新线程上生成的每个值。

另一个问题是Rx的意图和设计。Query是一个长时间运行的过程,而Subscription是一个处理结果的简短方法。如果你想作为Rx Observable运行一个长时间运行的函数,你最好的选择是使用Observable.ToAsync.

相关内容

  • 没有找到相关文章

最新更新