请观察以下单元测试:
using System;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
namespace UnitTests
{
[TestClass]
public class TestRx
{
public const int UNIT_TEST_TIMEOUT = 5000;
private static IObservable<int> GetObservable(int count = 100, int msWait = 10)
{
return Observable.Create<int>(async (obs, cancellationToken) =>
{
for (int i = 0; i < count && !cancellationToken.IsCancellationRequested; ++i)
{
int value = i;
obs.OnNext(await Task.Factory.StartNew(() =>
{
Thread.Sleep(msWait);
return value;
}));
}
});
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void Subscribe()
{
var tcs = new TaskCompletionSource<object>();
int i = 0;
GetObservable().Subscribe(n =>
{
Assert.AreEqual(i, n);
++i;
}, e => Assert.Fail(), () =>
{
Assert.AreEqual(100, i);
tcs.TrySetResult(null);
});
tcs.Task.Wait();
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void SubscribeCancel()
{
var tcs = new TaskCompletionSource<object>();
var cts = new CancellationTokenSource();
int i = 0;
GetObservable().Subscribe(n =>
{
Assert.AreEqual(i, n);
++i;
if (i == 5)
{
cts.Cancel();
}
}, e =>
{
Assert.IsTrue(i < 100);
tcs.TrySetResult(null);
}, () =>
{
Assert.IsTrue(i < 100);
tcs.TrySetResult(null);
}, cts.Token);
tcs.Task.Wait();
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void SubscribeThrow()
{
var tcs = new TaskCompletionSource<object>();
int i = 0;
GetObservable().Subscribe(n =>
{
Assert.AreEqual(i, n);
++i;
if (i == 5)
{
throw new Exception("xo-xo");
}
}, e =>
{
Assert.AreEqual("xo-xo", e.Message);
tcs.TrySetResult(null);
}, Assert.Fail);
tcs.Task.Wait();
}
}
}
单元测试SubscribeCancel
和SubscribeThrow
超时,因为OnError
回调从未被调用,因此对任务的等待从未结束。
怎么了?
p.S.
这个问题与如何正确地用IOobservable包装SqlDataReader有关?
编辑
与此同时,我创建了一个新的Rx问题-https://rx.codeplex.com/workitem/74
此外http://social.msdn.microsoft.com/Forums/en-US/5d0a4808-3ee0-4ff0-ab11-8cd36460cd66/why-is-the-onerror-callback-never-called-when-throwing-from-the-given-subscriber?forum=rx
EDIT2
以下观察者实现产生了完全相同的结果,尽管它符合Rx设计指南的第6.5段——"订阅实现不应抛出":
private static IObservable<int> GetObservable(int count = 100, int msWait = 10)
{
return Observable.Create<int>(async (obs, cancellationToken) =>
{
try
{
for (int i = 0; i < count && !cancellationToken.IsCancellationRequested; ++i)
{
int value = i;
obs.OnNext(await Task.Factory.StartNew(() =>
{
Thread.Sleep(msWait);
return value;
}));
}
obs.OnCompleted();
}
catch (Exception exc)
{
obs.OnError(exc);
}
});
}
EDIT3
我开始相信,当异步可观察序列被集成到同步代码中时,应该编写这样的代码(通常是服务器端的情况):
var tcs = new TaskCompletionSource<object>();
GetObservable().Subscribe(n =>
{
try
{
...
}
catch (Exception e)
{
DoErrorLogic();
tcs.TrySetException(e);
}
}, e =>
{
DoErrorLogic();
tcs.TrySetException(e);
}, () =>
{
DoCompletedLogic();
tcs.TrySetResult(null);
});
tcs.Task.Wait();
真的是这样吗?
编辑4
我想你想说的话终于开始从我生疏的脑子里流下来了。我现在将转到我的另一篇文章-如何正确地用IObservable包装SqlDataReader?
这种行为是经过设计的。如果订阅者抛出异常(顺便说一句,这是一种糟糕的做法),Rx框架会正确地解释它已经死了,并且不再与它通信。如果订阅被取消,这也不是错误——只是请求不发送任何类型的进一步事件——Rx对此表示尊重。
编辑以回应评论
我认为在文档中没有一个简单的参考——你所看到的行为是如此内在,以至于是隐含的。我能得到的最接近的是指向AnonymousSafeObserver和AutoDetatchObserver的源代码。后者有一个可能会有所帮助的解释性场景,但它有点涉及。
也许一个类比会有所帮助。想象一下,数据流事件是由报刊代理发送的报纸。用户是家庭。
订阅者抛出异常
报务员愉快地送报,直到有一天,其中一位订户——琼斯先生——开着煤气,他的房子爆炸了,杀死了琼斯先生,摧毁了房子(抛出了未经处理的例外)。报刊代理商意识到,他无法再向Jones先生递送报纸,也无法发送终止通知,报纸供应也没有问题(因此OnError或OnCompleted不合适),报刊代理商只剩下一名订户。
与此形成对比的是,报纸印刷商无意中使用了易燃墨水,并将工厂付之一炬。现在,可怜的报刊代理商必须向所有的订户发送一份解释性说明(OnError),因为订阅已经无限期停止。
订阅者取消订阅
Mr。琼斯从订阅中收到报纸,直到有一天他觉得自己厌倦了铺天盖地的令人沮丧的故事,并要求取消订阅。报务员有义务。他没有给琼斯先生发一张纸条,解释该报已经停止印刷版本(没有OnCompleted)——他们没有。他也没有给琼斯先生发一张纸条,解释报纸已经倒闭(没有OnError)——他只是按照琼斯先生的要求停止送报。
对Edit3的响应
我同情你的斗争。我注意到在您的代码中,您一直在尝试将TPL(任务)习惯用法与Rx的习惯用法相结合。这种尝试往往让人觉得笨拙,因为它们确实是截然不同的世界。很难对这样一段话发表评论:
我开始相信,当异步可观察序列集成到同步代码中时,应该编写这样的代码(通常是服务器端的情况):
与Brandon的精心断言非常一致,我想不出在服务器端以您尝试的方式将异步代码与同步代码集成在一起真的合适的例子。这对我来说就像是一种设计的味道。通常,人们会尝试保持代码的反应性——进行订阅,并让订阅者反应性地处理工作。我不记得有必要像您描述的那样转换为同步代码。
当然,看看您在Edit3中编写的代码,还不清楚您想要实现什么。源不负责对订阅者中的错误作出反应。这是摇狗的尾巴。为了确保订阅服务器服务的连续性,需要存在的异常处理程序应该在订阅处理代码中,而不是在可观察的源中——它应该只关心保护自己免受恶意观察者行为的影响。这种逻辑在上面链接的AnonymousSafeObserver中实现,并被大多数Rx提供的操作员使用。可观察对象很可能有逻辑来处理其源数据的连续性,但这是另一个问题,而不是您在代码中要解决的问题。
无论您试图通过调用ToTask
或Wait
桥接到同步代码,都可能需要仔细考虑您的设计。
我觉得,提供一个更具体的问题陈述——也许是从你试图解决的现实世界场景中提取的——将有助于为你提供更有用的建议。"SqlDataReader"示例中您说。。。
最后,人们可能会通过订阅observable[包装SqlDataReader]来直接使用它,但在某个时候他们将不得不等待结束(阻塞线程),因为周围的大多数代码仍然是同步的。
。。。突出了你所处的设计困境。在这种情况下,正如你所推断的那样,这些消费者显然最好使用IEnumerable<T>
接口,或者可能要求使用IObservable<List<T>>
。但关键是要从更大的角度来看,你试图将SqlDataReader包装在一个可观察的包装器中,这是一种设计气味,因为这是对特定一次性请求的固定数据供应。这可能是一个异步场景,但不是一个真正的反应场景。与更典型的反应性场景相比;只要你收到股票X的价格,就给我寄来";在那里,您完全按照源的要求设置未来的数据流,以便订阅者做出反应。
指南中没有明确说明,但Rx语法和IObservables
的目的暗示了这一点。IObservables将源的信息传递给一个或多个观察者。通信的信息是数据(通过OnNext
),可选地后面跟着OnCompleted
或OnError
。重要的是要记住,这些回调是由源触发的。它们不能也不应该作为观察者的结果而被触发。
如果OnError
被调用,那将是因为源可观察链中的某些东西发生了故障。这永远不会是因为一个观察者失败了。
在您的SubscribeThrow
示例中,观察器(由您为OnNext
、OnError
和OnCompleted
提供的3个Lambda构建)出现故障。观察者中的这种错误不能也不应该导致可观察源本身失败。
RX 2.0引入了保障措施以确保该合同。阅读RX 2.0版本博客文章的"改进错误处理策略"部分。
相关问题:使用ObserveOn时,如何在OnNext中处理异常?
第3版
这当然是一种方法,但它相当丑陋。首先,我将挑战您的断言,即异步服务器端代码通常需要同步才能与某些同步代码交互。我发现只有在单元测试中才是这样。
但不管怎样,我相信你订阅得太早了。我对Rx的经验是,每当我遇到摩擦时,都是因为我订阅得太早了,而应该扩展可观察的monad链。
在您的示例中,与其订阅数据流并在您的观察者中处理它,不如将您的处理器逻辑视为传入数据的另一个投影。在这种情况下,您的逻辑只是将一段数据转换为工作结果。这允许您将逻辑的成功或失败视为流的一部分,然后您可以按照自己的意愿进行观察。你最终得到的是:
var data = GetObservable();
var results = data.Select(item =>
{
DoWork(item);
// since your work does not produce anything...
// it either succeeds or throws an exception
// and you cannot make an Observable<void>
// return Unit.Default. Unit is the Rx equivalent of
// void.
return Unit.Default;
});
// subscribe to the stream and wait synchronously for it to finish
results.Wait(); // this will throw an exception the first time DoWork fails
// or asynchronously await the stream to finish...just like a Task
await results;
// or turn the stream into a Task that completes when the processing is complete.
var task = results.ToTask();
或者,如果您不想停止处理第一个错误,而是只想累积错误,该怎么办。这很容易,因为你认为你的工作是一个投影。。。
var results = data.Select(item =>
{
try
{
DoWork(item);
return null; // no error
}
catch (Exception e)
{
return e;
}
}).Where(e => e != null).ToList();
var errorList = results.Wait();
// or var errorList = await results;
// or Task<List<Exception>> errorListTask = results.ToTask();
这两个例子看起来都简单明了,只要以不同的方式思考问题就有可能。