Rx - 使用新线程上的每个项目



假设我有这样的代码:

static void Main(string[] args)
{
var scheduler = NewThreadScheduler.Default;
var enumerable = Enumerable.Range(0, 100);
enumerable
.ToObservable(scheduler)
.SubscribeOn(scheduler)
.Subscribe(item =>
{
Console.WriteLine("Consuming {0} on Thread: {1}", item, Thread.CurrentThread.ManagedThreadId);
// simulate long running operation
Thread.Sleep(1000);
});
Console.ReadKey();
}

当你把IEnumerable转换为IObservable时。然后我想在新线程上使用每个项目,所以我使用了SubsribeOn(调度程序)。不幸的是,每次迭代都在同一线程上工作,因此下一个迭代块。

结果是:

Consuming 0 on Thread: 4
Consuming 1 on Thread: 4
Consuming 2 on Thread: 4
Consuming 3 on Thread: 4
Consuming 4 on Thread: 4
....

是否有可能强迫这种行为?

你所看到的行为完全是设计使然。

Rx 的基础是它的语法,它声明流被定义为零个或多个OnNext调用的序列,后跟一个可选的OnErrorOnCompleted

特别是,Rx 语法规定这些消息中的每一个都按顺序传递给给定的订阅者

因此,您看到的是正确的行为 - 没有并发执行OnNext处理程序。鉴于这种刻意的约束,为每个OnNext创建一个新线程将非常浪费。

在幕后,如果您通过足够远的距离跟踪代码,您将看到NewThreadScheduler利用专门用于为每个订阅者重用线程的EventLoopScheduler。绰号NewThreadScheduler真正说明了每个订阅者都会获得一个新线程,而不是每个事件。

要看到这一点,请修改您的代码,以便我们有两个以不同速度运行的订阅者。你会看到每个线程都有自己的线程,并按照自己的节奏前进,越快越慢不受阻碍:

var scheduler = NewThreadScheduler.Default;
var enumerable = Enumerable.Range(0, 100);
var xs = enumerable
.ToObservable(scheduler)
.SubscribeOn(scheduler);
xs.Subscribe(item =>
{
Console.WriteLine("Slow consuming {0} on Thread: {1}",
item, Thread.CurrentThread.ManagedThreadId);
// simulate slower long running operation
Thread.Sleep(1000);
});
xs.Subscribe(item =>
{
Console.WriteLine("Fast consuming {0} on Thread: {1}",
item, Thread.CurrentThread.ManagedThreadId);
// simulate faster long running operation
Thread.Sleep(500);
});
Console.ReadKey();

您可能会发现通读 Rx 设计指南非常有帮助。

允许在订阅者中并发处理事件的愿望表明,具有多个使用者的队列可能是您所追求的 - 为此,您可以在 Rx 之外查看,例如 BCLConcurrentQueue<T>。 还可以将消息投影到异步调用中,并在完成时收集结果,而不会违反 Rx 语法约束。

例如,这里有一些类似的代码,它们在不同的时间内随机处理流中的每个数字。您可以看到结果顺序不对,并且彼此不受阻碍。这不是很棒的代码,但它说明了问题。如果异步工作是 IO 绑定的东西,它可能真正有用。另请注意使用Observable.Range以避免使用Enumerable.Range().ToObservable()组合。在 .NET Core 2.0 上测试:

var random = new Random();
// stop the threadpool from throttling us as it grows
ThreadPool.SetMinThreads(100, 1);
Observable.Range(0, 100)
.SelectMany(x => Observable.Start(() =>
{
Console.WriteLine($"Started {x}");
Thread.Sleep(random.Next(1, 10) * 1000);
return x;
}))
.Subscribe(item =>
{
Console.WriteLine($"{item}, {Thread.CurrentThread.ManagedThreadId}");
});
Console.ReadKey();

相关内容

  • 没有找到相关文章

最新更新