我创建了一个这样的EventLoopScheduler
:
eventLoopScheduler = new EventLoopScheduler(ts =>
new Thread(ts)
{
Name = "UpdatesEventLoop",
IsBackground = true
});
最重要的是,我创建了一个调度程序:
updatesScheduler = eventLoopScheduler.Catch<Exception>(e =>
{
Log.Error("Error on updates thread", e);
return false;
});
现在,我有一个经常发布更新的外部数据源。我监听这些更新并传递它们,最终更新 UI。它看起来像这样:
updatesObservable.Subscribe(x =>
{
//Do some data checking then pass it along
updatesScheduler.Schedule(() =>
{
//subject exists and is just a Subject of type "Update"
subject.OnNext(theUpdate);
});
});
当我启动我的应用程序时,一切正常。 更新流入,updatesObservable
获取它们,传递它们,调度程序立即执行操作。
但是,有时会出错。更新源会在一夜之间反弹,因此数据中断(我不确定这是导致我问题的原因,因为应用程序一旦再次启动就会重新连接到源(。当我早上回到办公室并查看应用程序时,GUI 显示过时的数据,并且没有发生新的更新。
我附加了调试器,我可以看到updatesObservable
正常地从源获取更新并将其传递给调度程序,但是,调度程序从未实际执行Action
(在这种情况下subject.OnNext(theUpdate)
永远不会被调用(。
我很难弄清楚是什么原因导致EventLoopScheduler
停止执行预定的Actions
,因为我不知道它的内部工作原理。
当我试图弄清楚时,我看了一眼eventLoopScheduler
的内部。 我注意到_evt
场(SemaphoreSlim
(有一个非常高的CurrentCount
并且不断增加。 我将这个CurrentCount
值与应用程序按预期运行时看到的值进行了比较,当一切正常时CurrentCount
徘徊在 0-20 左右,有时达到低 100,但很快回落到零。在破碎状态下,CurrentCount
达到 100,000 秒并且永远不会减少。
关于这里可能发生的事情的任何建议?CurrentCount
告诉我什么(不确定它到底代表什么(吗? 这是否确认某些事情不正常?
谢谢。如果有任何不清楚的地方,请告诉我。
扩展我上面的评论,在我看来 - 恐怕我还没有机会确认这一点 - 允许 CatchScheduler 抛出异常会导致 EventLoopScheduler 中的工作线程中止。由于异常不会永久保存到订阅者,因此订阅保持活动状态,但 EventLoopScheduler 会继续排队工作 - 因此 CurrentCount 不断增加 - 但不存在线程来处理查询的项目。
你应该能够证明这一点:a( 在异常处理程序中返回 true 并查看问题是否已解决(尽管发生异常时您的订阅可能会被拆除(,b( 将 EventLoopScheduler 的工作线程置于前台(IsBackground = false(,并查看您的应用程序是否崩溃或,c( 删除调度程序上的 Catch,改用 Observable.Catch。
我假设调度程序的 Catch 扩展方法的作者打算嵌套 Catch 子句,以便异常以与 C# Catch 块相同的方式流动,直到至少在块上处理异常,但我觉得这种行为很危险,这正是您遇到的原因。
请让我知道这是否有帮助,因为我对 CatchScheduler 及其实现很感兴趣。