如何使Rx回调在ThreadPool上运行?



您是否期望下面的程序打印False?

using System;
using System.Threading;
using System.Reactive.Linq;
using System.Reactive.Concurrency;
public static class Program
{
public static void Main()
{
Observable
.Return(1)
.ObserveOn(ThreadPoolScheduler.Instance)
.Do(x => Console.WriteLine(Thread.CurrentThread.IsThreadPoolThread))
.Wait();
}
}

输出:

False

我的理解是ThreadPoolScheduler是为了在ThreadPool上安排工作,但显然这不是发生的事情。它的名称可能是指Rx内部的其他线程池,而不是指System.Threading名称空间中的实际ThreadPool类。

我做了各种尝试来强制回调在ThreadPool上运行,但没有成功。我尝试过的一些事情:

.ObserveOn(Scheduler.Default)
.ObserveOn(DefaultScheduler.Instance)
.ObserveOn(TaskPoolScheduler.Default)
.ObserveOn(new TaskPoolScheduler(Task.Factory))
.ObserveOn(new TaskPoolScheduler(new TaskFactory(TaskScheduler.Default)))
ThreadPoolScheduler.Instance.DisableOptimizations(); // At the start of the program

上面的最后一次尝试是在微软论坛上看到这个问题之后。无论我尝试了什么,Thread.IsThreadPoolThread属性一直返回false

我应该写我自己的IScheduler实现来确保我的代码在ThreadPool上运行吗?对于这样一个微不足道的目标,这听起来是一项相当重要的任务。

。. NET 5.0.1,系统。react 5.0.0, c# 9

你分享的链接实际上有一个解决方案:

var modScheduler = ThreadPoolScheduler.Instance.DisableOptimizations(new[] { typeof(ISchedulerLongRunning) });
Observable
.Return(1)
.ObserveOn(modScheduler)
.Select(_ => Thread.CurrentThread)
.Subscribe(t => Console.WriteLine(t.IsThreadPoolThread));

如果你看ThreadPoolScheduler的源代码,你会看到所有的工作都被发送到ThreadPool,除了ScheduleLongRunning。如果你禁用这个优化,那么所有的工作都将被发送到ThreadPool。

最新更新