如何使NewThreadScheduler.默认时间表适用于不同的线程



试图了解Rx,下面的代码似乎无法正常工作,因为我希望它在不同的线程上运行。我错过了什么?

using System;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;

namespace OverviewConsoleApp
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Thread {0}", Thread.CurrentThread.ManagedThreadId);
var query = Enumerable.Range(1, 5).Select(n => n);
IObservable<int> observableQuery = query.ToObservable(NewThreadScheduler.Default);
observableQuery.Subscribe(ProcessNumber, ImDone);
}
static void ProcessNumber(int number)
{
Console.WriteLine("{0} Thread {1}", number, Thread.CurrentThread.ManagedThreadId);
}
static void ImDone()
{
Console.WriteLine("I am done!");
}
}
}

新线程调度程序。默认调度同一线程上的所有工作。我正在尝试将它们安排在不同的线程上。

我看到了这个SO的答案,但建议的答案似乎已经过时了,因为IEnumerable不再有Do方法。有人能帮我如何在不同的线程上运行它们吗?

系统。必须安装反应式nuget包才能运行上述程序。

someEnumerable.ToObservable(scheduler)调度可观察到的所有项目在该调度程序上运行。如果你想安排每个单独的项目,你必须把每个项目变成自己的可观察项目。以下内容可以做到这一点:

IObservable<int> observableQuery = query
.ToObservable()
.SelectMany(i => Observable.Return(i).ObserveOn(NewThreadScheduler.Default /*Or TaskPoolScheduler.Default as Asti mentioned */));

对于一个简单的问题,答案并不完全正确。

ToObservable本质上检查名为ScheduleLongRunning的调度程序优化是否可用——顾名思义,这是为了运行可能运行很长时间的单个任务,从而阻塞调度程序。否则,它必须递归地调度枚举,这是一个效率更高的顺序。

NewThreadScheduler是支持ScheduleLongRunning的理想候选者,它只需要在一个新线程上运行整个thunk。

最终的结果是,最终所有的工作都安排在一个线程中。

最后,NewThreadScheduler对于枚举来说是过度的,所以您可能想要切换到TaskPoolScheduler。但是等等,同样的优化在oneTask上运行整个过程。

所以ToObservable(TaskPoolScheduler.Default.DisableOptimizations())

第页。S.

与问题无关,但Do方法以及一大堆有用的运算符都在System.Interactive包中。如果你不想安装一个完整的软件包,Do只是

public static IEnumerable<T> Do<T>(this IEnumerable<T> source, Action<T> sideEffect)
{
foreach (var value in source)
{
sideEffect(value);
yield return value;
}
}

最新更新