当在EventLoopScheduler
(其工作队列中至少有一个项目(上调用Dispose
时,它将抛出ObjectDisposedException
。从其工作线程引发异常。
我已经看到并阅读了已经存在的两个问题:
- RX2.0:ObjectDisposedException 在处理 EventLoopScheduler 之后,
- Reactive Rx 2.0 EventLoopScheduler ObjectDisposedException after dispose
但是,我认为有些答案并不完全正确,引用Rx关于EventLoopScheduler的介绍:
EventLoopScheduler 实现 IDisposable,调用 Dispose 将允许线程终止。与 IDisposable 的任何实现一样,显式管理所创建资源的生存期是合适的。
来源: http://introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#EventLoopScheduler
它们提供了有关如何正确使用EventLoopScheduler
的示例:
Observable
.Using(()=>new EventLoopScheduler(), els=> GetPrices(els))
.Subscribe(...)
不幸的是,这个例子不起作用(至少对我来说不是:-(。给定这段代码:
internal class Program
{
private static void Main(string[] args)
{
var source = new Subject<string>();
var subscription = Observable.Using(
() => new EventLoopScheduler(),
scheduler => source
.ObserveOn(scheduler)
.Do(LongRunningAction))
.Subscribe();
source.OnNext("First action (2 seconds)");
Thread.Sleep(TimeSpan.FromSeconds(1));
subscription.Dispose(); // Scheduler is still busy!
Console.ReadLine();
}
private static void LongRunningAction(string text) {
Thread.Sleep(TimeSpan.FromSeconds(2));
Console.WriteLine(text);
}
}
我希望在 2 秒后看到一条短信而没有任何错误(即使订阅在 1 秒后已处置(。EventLoopScheduler
无法取消正在进行的操作,这对我来说没关系。
您实际得到的是消息和未处理的ObjectDisposedException
。
那么,这是一个错误还是我做错了?:-(
为了解决此异常,我目前包装了EventLoopScheduler
并在wrapper.Dispose()
上调用scheduler.Schedule(() => scheduler.Dispose())
。
我上面的评论与詹姆斯的回答。此"答案"在此处提供"修复"问题的示例代码。
但是,我确实认为EventLoopScheduler存在一个错误。我认为*如果它已被处置,它不应该继续递归地安排工作。
void Main()
{
//In my example GetPrices is the source.
// I meant that you could use an ELS to do some heavy work to get prices.
//var source = new Subject<string>();
var subscription = Observable.Using(
() => new EventLoopScheduler(),
scheduler =>
{
return Observable.Create<string>((obs, ct) =>
{
var scheduleItem = scheduler.Schedule(0, (state,self) => {
//Do work to get price (network request? or Heavy CPU work?)
var price = state.ToString("c");
LongRunningAction(price);
obs.OnNext(price);
//Without this check, we see that the Scheduler will try to
// recursively call itself even when disposed.
if(!ct.IsCancellationRequested)
self(state+1);
});
return Task.FromResult(scheduleItem);
});
})
.Subscribe();
Thread.Sleep(TimeSpan.FromSeconds(1));
subscription.Dispose(); // Scheduler is still busy!
Console.ReadLine();
}
private static void LongRunningAction(string text)
{
Thread.Sleep(TimeSpan.FromSeconds(2));
Console.WriteLine(text);
}
*但我完全保留在我确信的情况下改变主意的权利。
FWIW :通常,我只使用ELS作为服务中的readonly
字段,我想将线程专用于处理一些入站工作。 例如,我只想使用一个线程从该服务的网络或磁盘读取。在这种情况下,我创建了一个ELS,它将完成任何工作。然后,当释放包含它的类时,将释放它。我认为我根本不会经常使用它,因为 IntroToRx.com 样本显示。
因此,在断章取义地引用后,我不得不做出回应。 :)让我们将报价扩展到重要的部分:
你没有业务处置那个事件循环调度程序!一旦你拥有 将其传递给其他Rx运营商,您已将责任转嫁给其他Rx运营商 为它。
问题是,您正在尝试让观察者(订阅者(清理调度程序。但是观察者将调度程序传递给可观察对象。如果要释放调度程序,则必须考虑它现在"拥有"它的可观察量。可观察者知道:
- 订阅时
- 当它被取消订阅时
- 当它已将OnComplete/OnError发送给其所有订阅者时
有了这些信息,它就可以完全知道何时可以处置它的任何调度程序。(即便如此,如果您尝试进行通用清理,它也需要在终结器中处理调度程序,因为这是它可以保证另一个订阅者在没有特殊知识的情况下不会出现的唯一一点。
但是,个人订户不能保证拥有任何此类信息 - 了解潜在的其他订户,并且何时发送了最后一个事件不会暴露给它。它传递给调度程序的可观察对象可以以各种时髦的方式在它身上聚会:调用睡眠很多的疯狂方法;凭空捏造事件,只是因为它幻想它;将活动推迟到下周二;通过在冰箱上钉一张纸条来回应退订事件,并承诺诚实地去那个 Mañana。
那么,您想每次都安全地清理该调度程序吗?然后你需要让你的可观察对象这样做。
内置运算符不会为此烦恼 - 我怀疑这不被认为是一个大问题,因为它在大多数用例中都没有必要。事实上,我认为我从未见过需要处置EventLoopScheduler
的情况 - 它们一直用于程序的生命周期。很容易陷入认为您需要处理您看到的每个 IDisposable - 但实际上对于 Rx,它通常只是没有必要(尤其是对于订阅,Dispose
它实际上只是取消订阅的请求 - 而不是清理资源的命令。Rx 团队不想在 IDisposable
制作一个非常好的订阅句柄时创建另一个界面。
EventLoopScheduler
在不忙的时候会挂起它的线程 - 所以大多数时候你不需要担心清理,除非你正在创建任意数量的它们(提示:你真的不需要这样做(。
如果您这样做,您可能想查看NewThreadScheduler
是否会这样做,它实际上在幕后使用EventLoopScheduler
,在特殊机密(即内部(模式下,如果调度程序队列为空,则会退出线程 - 但否则会重用它。是的,尽管普遍存在相反的误解,但NewThreadScheduler
确实重用线程,因此在单个订阅者的负载下不会产生很大的相关线程创建成本。只有当多个订阅者在运行时,才会显示多个线程,或者当它空闲时,下一个事件才会导致线程创建。
但是如果你正在使用一个EventLoopScheduler
,你可能在一个地方使用它来将事情绑定到一个全局共享的事件循环(毕竟 - 这就是事件循环通常所做的 - 将事件集中到应用程序中的单个线程上( - 所以清理该线程很少是必要的,因为无论如何当进程死亡时它都会消失。
好的,我有一些工作。但它不是线程安全的,相关的代码行标有注释。猜猜我应该打开一个错误票:-/
private static void Main(string[] args)
{
var originSource = new Subject<string>();
var subscription = UsingEventLoop(originSource)
.Do(LongRunningAction) // runs on EventLoopScheduler thread
.Subscribe();
originSource.OnNext("First action (appears after 2 seconds)");
originSource.OnNext("Second action (must not appear");
Thread.Sleep(TimeSpan.FromSeconds(1));
subscription.Dispose(); // Scheduler is still busy with first action!
Console.WriteLine("Press any key to exit.");
Console.ReadLine();
}
private static IObservable<TValue> UsingEventLoop<TValue>(IObservable<TValue> source)
{
return Observable.Using(
() => new EventLoopScheduler(),
scheduler => Observable.Create<TValue>((obs, ct) =>
{
return Task.FromResult(source.Subscribe(value =>
{
// The following check+call is NOT thread safe!
if (!ct.IsCancellationRequested)
{
scheduler.Schedule(() => obs.OnNext(value));
}
}));
}));
}
private static void LongRunningAction<TValue>(TValue value) {
Thread.Sleep(TimeSpan.FromSeconds(2));
Console.WriteLine(value);
}