实施Rx处理程序的最佳实践是什么



我有这个类来解释我的问题:

public class DataObserver: IDisposable
{
private readonly List<IDisposable> _subscriptions = new List<IDisposable>();
private readonly SomeBusinessLogicServer _server;
public DataObserver(SomeBusinessLogicServer server, IObservable<SomeData> data)
{
_server = server;
_subscriptions.Add(data.Subscribe(TryHandle));
}
private void TryHandle(SomeData data)
{
try
{
_server.MakeApiCallAsync(data).Wait();
}
catch (Exception)
{
// Handle exceptions somehow!
}
}
public void Dispose()
{
_subscriptions.ForEach(s => s.Dispose());
_subscriptions.Clear();
}

}

A) 如何避免TryHandle()函数内部的阻塞?

B) 您将如何发布在该函数中捕获的异常以正确处理它们?

Rx设计指南在编写自己的Rx运算符时提供了许多有用的建议:

http://go.microsoft.com/fwlink/?LinkID=205219

我确信我会因为链接到一篇外部文章而受到抨击,但这个链接已经好几年了,而且太大了,无法在SO上重新发布。

首先,看看CompositeDisposable,而不是自己重新实现它。

除此之外,你的问题还有很多答案。我发现,在使用Rx时,我得到的最好的见解是意识到,在大多数情况下,你想订阅的实际上只是你正在构建的可观察对象中的更多链,你并不真的想订阅,而是想对传入的可观察内容应用另一种转换。让一些进一步"处于系统边缘"并对如何处理错误有更多了解的代码来实际订阅

在您介绍的示例中:

A) 不要仅仅通过将IObservable<SomeData>转换为IObservable<Task>(它实际上更好地表示为IObservable<IObservable<Unit>>)来进行阻塞。B) 通过以错误结束可观察项来发布异常,或者,如果不希望异常结束可观察项,则公开IObservable<Exception>

以下是我如何重写您的示例,假设您不希望流以错误结束,而是在报告错误后继续运行:

public static class DataObserver
{
public static IObservable<Exception> ApplyLogic(this IObservable<SomeData> source, SomeBusinessLogicServer server)
{
return source
.Select(data =>
{
// execute the async method as an observable<Unit>
// ignore its results, but capture its error (if any) and yield it.
return Observable
.FromAsync(() => server.MakeApiCallAsync(data))
.IgnoreElements()
.Select(_ => (Exception)null) // to cast from IObservable<Unit> to IObservable<Exception>
.Catch((Exception e) => Observable.Return(e));
})
// runs the Api calls sequentially (so they will not run concurrently)
// If you prefer to let the calls run in parallel, then use
// .Merge() instead of .Concat()
.Concat() ;
}
}

// Usage (in Main() perhaps)
IObservable<SomeData> dataStream = ...;
var subscription = dataStream.ApplyLogic(server).Subscribe(error =>
{
Console.WriteLine("An error occurred processing a dataItem: {0}", error);
}, fatalError =>
{
Console.WriteLine("A fatal error occurred retrieving data from the dataStream: {0}", fatalError);
});

相关内容

  • 没有找到相关文章