使用 ObserveOn(Scheduler.ThreadPool)
时,当观察者OnNext
抛出错误时,我的应用程序就会终止。我发现解决这个问题的唯一方法是使用下面的自定义扩展方法(除了确保 OnNext 永远不会引发异常)。然后确保每个ObserveOn
后面都有一个ExceptionToError
。
public static IObservable<T> ExceptionToError<T>(this IObservable<T> source) {
var sub = new Subject<T>();
source.Subscribe(i => {
try {
sub.OnNext(i);
} catch (Exception err) {
sub.OnError(err);
}
}
, e => sub.OnError(e), () => sub.OnCompleted());
return sub;
}
但是,这感觉不对。有没有更好的方法来解决这个问题?
例
由于未捕获的异常,此程序崩溃。
class Program {
static void Main(string[] args) {
try {
var xs = new Subject<int>();
xs.ObserveOn(Scheduler.ThreadPool).Subscribe(x => {
Console.WriteLine(x);
if (x % 5 == 0) {
throw new System.Exception("Bang!");
}
}, ex => Console.WriteLine("Caught:" + ex.Message)); // <- not reached
xs.OnNext(1);
xs.OnNext(2);
xs.OnNext(3);
xs.OnNext(4);
xs.OnNext(5);
} catch (Exception e) {
Console.WriteLine("Caught : " + e.Message); // <- also not reached
} finally {
Console.ReadKey();
}
}
}
我们将在 Rx v2.0 中解决此问题,从 RC 版本开始。您可以在我们的博客上阅读所有相关信息 http://blogs.msdn.com/rxteam.它基本上归结为管道本身中更规范的错误处理,结合 SubscribeSafe 扩展方法(将订阅期间的错误重定向到 OnError 通道)和 IScheduler 上的 Catch 扩展方法(围绕计划操作包装具有异常处理逻辑的计划程序)。
关于这里提出的ExceptionToError方法,它有一个缺陷。当回调运行时,IDisposable 订阅对象仍可为 null;有一个基本的竞争条件。要解决此问题,您必须使用 SingleAssignmentDisposable。
订阅中的错误和可观察量中的错误之间存在差异。快速测试:
var xs = new Subject<int>();
xs.Subscribe(x => { Console.WriteLine(x); if (x % 3 == 0) throw new System.Exception("Error in subscription"); },
ex => Console.WriteLine("Error in source: " + ex.Message));
运行这个,你会在源代码中得到一个很好的处理错误:
xs.OnNext(1);
xs.OnNext(2);
xs.OnError(new Exception("from source"));
运行此版本,你将在订阅中收到一个未处理的错误:
xs.OnNext(1);
xs.OnNext(2);
xs.OnNext(3);
解决方案所做的是获取订阅中的错误,并在源中使它们成为错误。您是在原始流上完成此操作的,而不是基于每个订阅。你可能有意这样做,也可能不打算这样做,但这几乎可以肯定是错误的。
执行此操作的"正确"方法是将所需的错误处理直接添加到订阅操作中,这是它所属的位置。如果不想直接修改订阅功能,可以使用一个小帮手:
public static Action<T> ActionAndCatch<T>(Action<T> action, Action<Exception> catchAction)
{
return item =>
{
try { action(item); }
catch (System.Exception e) { catchAction(e); }
};
}
现在使用它,再次显示不同错误之间的差异:
xs.Subscribe(ActionAndCatch<int>(x => { Console.WriteLine(x); if (x % 3 == 0) throw new System.Exception("Error in subscription"); },
ex => Console.WriteLine("Caught error in subscription: " + ex.Message)),
ex => Console.WriteLine("Error in source: " + ex.Message));
现在,我们可以处理源中的(单独)错误和订阅中的错误。当然,这些操作中的任何一个都可以在方法中定义,使上面的代码像(可能)一样简单:
xs.Subscribe(ActionAndCatch(Handler, ExceptionHandler), SourceExceptionHandler);
编辑
在评论中,我们开始讨论订阅中的错误指向流本身的错误,并且您不希望该流中有其他订阅者。这是一个完全不同的问题类型。我倾向于编写一个可观察的Validate
扩展来处理这种情况:
public static IObservable<T> Validate<T>(this IObservable<T> source, Predicate<T> valid)
{
return Observable.Create<T>(o => {
return source.Subscribe(
x => {
if (valid(x)) o.OnNext(x);
else o.OnError(new Exception("Could not validate: " + x));
}, e => o.OnError(e), () => o.OnCompleted()
);
});
}
然后简单易用,不混合隐喻(仅在源中出现错误):
xs
.Validate(x => x != 3)
.Subscribe(x => Console.WriteLine(x),
ex => Console.WriteLine("Error in source: " + ex.Message));
如果仍希望Subscribe
禁止异常,则应使用其他讨论的方法之一。
您当前的解决方案并不理想。正如一位Rx人在这里所说:
Rx 运算符不会捕获在调用 OnNext、OnError 或 OnDone 时发生的异常。 这是因为我们希望 (1) 观察者实现者最了解如何处理这些异常,我们无法对它们做任何合理的事情,以及 (2) 如果发生异常,那么我们希望它冒泡出来,而不是由 Rx 处理。
您当前的解决方案让 IObservable 来处理 IObserver 抛出的错误,这没有意义,因为从语义上讲,IObservable 应该不知道观察它的事情。请考虑以下示例:
var errorFreeSource = new Subject<int>();
var sourceWithExceptionToError = errorFreeSource.ExceptionToError();
var observerThatThrows = Observer.Create<int>(x =>
{
if (x % 5 == 0)
throw new Exception();
},
ex => Console.WriteLine("There's an argument that this should be called"),
() => Console.WriteLine("OnCompleted"));
var observerThatWorks = Observer.Create<int>(
x => Console.WriteLine("All good"),
ex => Console.WriteLine("But definitely not this"),
() => Console.WriteLine("OnCompleted"));
sourceWithExceptionToError.Subscribe(observerThatThrows);
sourceWithExceptionToError.Subscribe(observerThatWorks);
errorFreeSource.OnNext(1);
errorFreeSource.OnNext(2);
errorFreeSource.OnNext(3);
errorFreeSource.OnNext(4);
errorFreeSource.OnNext(5);
Console.ReadLine();
在这里,源或观察者没有问题ThatWorks,但由于与另一个观察者无关的错误,将调用其OnError。要阻止不同线程中的异常结束进程,您必须在该线程中捕获它们,因此请在观察器中放置一个 try/catch 块。
我查看了应该解决此问题的本机SubscribeSafe
方法,但我无法使其工作。此方法具有接受IObserver<T>
的单个重载:
// Subscribes to the specified source, re-routing synchronous exceptions during
// invocation of the IObservable<T>.Subscribe(IObserver<T>) method to the
// observer's IObserver<T>.OnError(Exception) channel. This method is typically
// used when writing query operators.
public static IDisposable SubscribeSafe<T>(this IObservable<T> source,
IObserver<T> observer);
我尝试传递由 Observer.Create
工厂方法创建的观察器,但 onNext
处理程序中的异常继续使进程¹崩溃,就像它们对正常Subscribe
所做的那样。所以我最终写了我自己的SubscribeSafe
版本。这个接受三个处理程序作为参数,并将onNext
和onCompleted
处理程序引发的任何异常汇集到onError
处理程序。
/// <summary>Subscribes an element handler, an error handler, and a completion
/// handler to an observable sequence. Any exceptions thrown by the element or
/// the completion handler are propagated through the error handler.</summary>
public static IDisposable SubscribeSafe<T>(this IObservable<T> source,
Action<T> onNext, Action<Exception> onError, Action onCompleted)
{
// Arguments validation omitted
var disposable = new SingleAssignmentDisposable();
disposable.Disposable = source.Subscribe(
value =>
{
try { onNext(value); } catch (Exception ex) { onError(ex); disposable.Dispose(); }
}, onError, () =>
{
try { onCompleted(); } catch (Exception ex) { onError(ex); }
}
);
return disposable;
}
请注意,onError
处理程序中未处理的异常仍会导致进程崩溃!
¹ 仅在ThreadPool
上异步调用处理程序时引发的异常。
你是对的 - 它应该感觉很糟糕。使用和返回这样的主题不是一个好方法。
至少你应该像这样实现这个方法:
public static IObservable<T> ExceptionToError<T>(this IObservable<T> source)
{
return Observable.Create<T>(o =>
{
var subscription = (IDisposable)null;
subscription = source.Subscribe(x =>
{
try
{
o.OnNext(x);
}
catch (Exception ex)
{
o.OnError(ex);
subscription.Dispose();
}
}, e => o.OnError(e), () => o.OnCompleted());
return subscription;
});
}
请注意,没有使用任何主题,如果我发现错误,我会释放订阅以防止序列继续超过错误。
但是,为什么不在订阅中添加OnError
处理程序。有点像这样:
var xs = new Subject<int>();
xs.ObserveOn(Scheduler.ThreadPool).Subscribe(x =>
{
Console.WriteLine(x);
if (x % 5 == 0)
{
throw new System.Exception("Bang!");
}
}, ex => Console.WriteLine(ex.Message));
xs.OnNext(1);
xs.OnNext(2);
xs.OnNext(3);
xs.OnNext(4);
xs.OnNext(5);
此代码在订阅中正确捕获错误。
另一种方法是使用Materialize
扩展方法,但除非上述解决方案不起作用,否则这可能有点矫枉过正。