我需要将 RX 流中的错误 (IObservable
) 转换为包含流订阅的方法中的异常
(由于这个问题 https://github.com/aspnet/SignalR/pull/1331,因此错误不会序列化到客户端。解决此问题后,我将恢复正确处理错误
例如
我有以下方法
public IObservable<StreamItem> LiveStream()
{
_mySvc.Start();
return _mySvc.ThingChanged();
}
所以我尝试订阅流并重新抛出错误,但它仍然没有传输到客户端:
public IObservable<StreamItem> LiveStream()
{
_mySvc.Start();
_mySvc.ThingChanged().Subscribe(item => {}, OnError, () => {});
return _mySvc.ThingChanged();
}
private void OnError(Exception exception)
{
throw new Exception(exception.Message);
}
我需要的是投入直播方法的等效性
例如,此错误被传播到客户端
public IObservable<StreamItem> LiveStream()
{
_mySvc.Start();
throw new Exception("some error message");
return _mySvc.ThingChanged();
}
任何想法如何实现这一目标?
我也发现了这一点,特别是对于"包含的"反应式管道 - 即具有明确定义的开始和结束的管道。在这种情况下,只需允许基础异常冒泡到包含范围就足够了。但正如你所发现的,这个概念对 Rx 来说通常是陌生的:管道中发生的事情会留在管道中。
我在包含场景中发现的唯一方法是使用Catch()
将错误"滑出"流中,并交回一个空IObservable
以允许流自然停止(否则,如果您正在await
完成IObservable
,您将挂起)。
这在您的LiveStream()
方法中不起作用,因为该上下文/范围应该在您使用流之前很久就不存在了。因此,这必须在包含整个管道的上下文中发生。
Exception error = null;
var source = LiveStream()
.Catch<WhatYoureStreaming, Exception>(ex => {error = ex; return Observable.Empty<WhatYoureStreaming>(); })
...
await source; // if this is how you're awaiting completion
// not a real exception type, use your own
if (error != null) throw new ContainingException("oops", error);
只是不要在最后throw error
那里,您将丢失原始堆栈跟踪。
试试这段代码:
public IObservable<StreamItem> LiveStream()
{
_mySvc.Start();
return
_mySvc
.ThingChanged()
.Materialize()
.Do(x =>
{
if (x.Kind == NotificationKind.OnError)
{
OnError(x.Exception);
}
})
.Dematerialize();
}
我不确定这是不是最好的方法 - 像这样的抛出异常可能会导致您在流中感到悲伤,最终触发错误的异常处理程序。您可能需要找到其他解决方案。