RX - 在包含方法中重新引发错误



我需要将 RX 流中的错误 (IObservable) 转换为包含流订阅的方法中的异常

(由于这个问题 https://github.com/aspnet/SignalR/pull/1331,因此错误不会序列化到客户端。解决此问题后,我将恢复正确处理错误

例如

我有以下方法

public IObservable<StreamItem> LiveStream()
{
_mySvc.Start();
return _mySvc.ThingChanged();
}

所以我尝试订阅流并重新抛出错误,但它仍然没有传输到客户端:

public IObservable<StreamItem> LiveStream()
{
_mySvc.Start();
_mySvc.ThingChanged().Subscribe(item => {}, OnError, () => {});
return _mySvc.ThingChanged();
}
private void OnError(Exception exception)
{
throw new Exception(exception.Message);
}

我需要的是投入直播方法的等效性

例如,此错误被传播到客户端

public IObservable<StreamItem> LiveStream()
{
_mySvc.Start();
throw new Exception("some error message");
return _mySvc.ThingChanged();
}

任何想法如何实现这一目标?

我也发现了这一点,特别是对于"包含的"反应式管道 - 即具有明确定义的开始和结束的管道。在这种情况下,只需允许基础异常冒泡到包含范围就足够了。但正如你所发现的,这个概念对 Rx 来说通常是陌生的:管道中发生的事情会留在管道中。

我在包含场景中发现的唯一方法是使用Catch()将错误"滑出"流中,并交回一个空IObservable以允许流自然停止(否则,如果您正在await完成IObservable,您将挂起)。

这在您的LiveStream()方法不起作用,因为该上下文/范围应该在您使用流之前很久就不存在了。因此,这必须在包含整个管道的上下文中发生。

Exception error = null;
var source = LiveStream()
.Catch<WhatYoureStreaming, Exception>(ex => {error = ex; return Observable.Empty<WhatYoureStreaming>(); })
...
await source; // if this is how you're awaiting completion
// not a real exception type, use your own
if (error != null) throw new ContainingException("oops", error);

只是不要在最后throw error那里,您将丢失原始堆栈跟踪。

试试这段代码:

public IObservable<StreamItem> LiveStream()
{
_mySvc.Start();
return
_mySvc
.ThingChanged()
.Materialize()
.Do(x =>
{
if (x.Kind == NotificationKind.OnError)
{
OnError(x.Exception);
}
})
.Dematerialize();
}

我不确定这是不是最好的方法 - 像这样的抛出异常可能会导致您在流中感到悲伤,最终触发错误的异常处理程序。您可能需要找到其他解决方案。

相关内容

  • 没有找到相关文章

最新更新