我想探索使用IObservable<T>
作为SqlDataReader
的包装器的可能性。到目前为止,我们使用读取器来避免在内存中具体化整个结果,并且我们使用阻塞同步API来实现这一点。
现在我们想尝试将异步API和.NET反应式扩展结合使用。
然而,由于采用异步方式是一个渐进的过程,因此此代码必须与同步代码共存。
我们已经知道,这种同步和异步的混合在ASP.NET中是不起作用的,因为整个请求执行路径必须始终是异步的。一篇关于这个主题的优秀文章是http://blog.stephencleary.com/2012/07/dont-block-on-async-code.html
但我说的是一个简单的WCF服务。我们已经在那里混合了异步和同步代码,但这是我们第一次希望引入Rx,并且存在问题。
我创建了简单的单元测试(我们使用mstest,叹息:-()来演示这些问题。我希望有人能解释我发生了什么。请在下面找到整个源代码(使用Moq):
using System;
using System.Data.Common;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
namespace UnitTests
{
public static class Extensions
{
public static Task<List<T>> ToListAsync<T>(this IObservable<T> observable)
{
var res = new List<T>();
var tcs = new TaskCompletionSource<List<T>>();
observable.Subscribe(res.Add, e => tcs.TrySetException(e), () => tcs.TrySetResult(res));
return tcs.Task;
}
}
[TestClass]
public class TestRx
{
public const int UNIT_TEST_TIMEOUT = 5000;
private static DbDataReader CreateDataReader(int count = 100, int msWait = 10)
{
var curItemIndex = -1;
var mockDataReader = new Mock<DbDataReader>();
mockDataReader.Setup(o => o.ReadAsync(It.IsAny<CancellationToken>())).Returns<CancellationToken>(ct => Task.Factory.StartNew(() =>
{
Thread.Sleep(msWait);
if (curItemIndex + 1 < count && !ct.IsCancellationRequested)
{
++curItemIndex;
return true;
}
Trace.WriteLine(curItemIndex);
return false;
}));
mockDataReader.Setup(o => o[0]).Returns<int>(_ => curItemIndex);
mockDataReader.CallBase = true;
mockDataReader.Setup(o => o.Close()).Verifiable();
return mockDataReader.Object;
}
private static IObservable<int> GetObservable(DbDataReader reader)
{
return Observable.Create<int>(async (obs, cancellationToken) =>
{
using (reader)
{
while (!cancellationToken.IsCancellationRequested && await reader.ReadAsync(cancellationToken))
{
obs.OnNext((int)reader[0]);
}
}
});
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void ToListAsyncResult()
{
var reader = CreateDataReader();
var numbers = GetObservable(reader).ToListAsync().Result;
CollectionAssert.AreEqual(Enumerable.Range(0, 100).ToList(), numbers);
Mock.Get(reader).Verify(o => o.Close());
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void ToEnumerableToList()
{
var reader = CreateDataReader();
var numbers = GetObservable(reader).ToEnumerable().ToList();
CollectionAssert.AreEqual(Enumerable.Range(0, 100).ToList(), numbers);
Mock.Get(reader).Verify(o => o.Close());
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void ToEnumerableForEach()
{
var reader = CreateDataReader();
int i = 0;
foreach (var n in GetObservable(reader).ToEnumerable())
{
Assert.AreEqual(i, n);
++i;
}
Assert.AreEqual(100, i);
Mock.Get(reader).Verify(o => o.Close());
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void ToEnumerableForEachBreak()
{
var reader = CreateDataReader();
int i = 0;
foreach (var n in GetObservable(reader).ToEnumerable())
{
Assert.AreEqual(i, n);
++i;
if (i == 5)
{
break;
}
}
Mock.Get(reader).Verify(o => o.Close());
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void ToEnumerableForEachThrow()
{
var reader = CreateDataReader();
int i = 0;
try
{
foreach (var n in GetObservable(reader).ToEnumerable())
{
Assert.AreEqual(i, n);
++i;
if (i == 5)
{
throw new Exception("xo-xo");
}
}
Assert.Fail();
}
catch (Exception exc)
{
Assert.AreEqual("xo-xo", exc.Message);
Mock.Get(reader).Verify(o => o.Close());
}
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void Subscribe()
{
var reader = CreateDataReader();
var tcs = new TaskCompletionSource<object>();
int i = 0;
GetObservable(reader).Subscribe(n =>
{
Assert.AreEqual(i, n);
++i;
}, () =>
{
Assert.AreEqual(100, i);
Mock.Get(reader).Verify(o => o.Close());
tcs.TrySetResult(null);
});
tcs.Task.Wait();
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void SubscribeCancel()
{
var reader = CreateDataReader();
var tcs = new TaskCompletionSource<object>();
var cts = new CancellationTokenSource();
int i = 0;
GetObservable(reader).Subscribe(n =>
{
Assert.AreEqual(i, n);
++i;
if (i == 5)
{
cts.Cancel();
}
}, e =>
{
Assert.IsTrue(i < 100);
Mock.Get(reader).Verify(o => o.Close());
tcs.TrySetException(e);
}, () =>
{
Assert.IsTrue(i < 100);
Mock.Get(reader).Verify(o => o.Close());
tcs.TrySetResult(null);
}, cts.Token);
tcs.Task.Wait();
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void SubscribeThrow()
{
var reader = CreateDataReader();
var tcs = new TaskCompletionSource<object>();
int i = 0;
GetObservable(reader).Subscribe(n =>
{
Assert.AreEqual(i, n);
++i;
if (i == 5)
{
throw new Exception("xo-xo");
}
}, e =>
{
Assert.AreEqual("xo-xo", e.Message);
Mock.Get(reader).Verify(o => o.Close());
tcs.TrySetResult(null);
});
tcs.Task.Wait();
}
}
}
这些单元测试捕获了API返回包装数据读取器的IObservable<T>
的所有可能用途:
- 人们可能希望使用我们的
ToListAsync
扩展方法或.ToEnumerable().ToList()
来完全实现它 - 人们可能希望使用
ToEnumerable
扩展方法对其进行迭代。是的-如果消耗很快,它会阻止,如果消耗很慢,它会在内部队列中具体化数据,但这种情况仍然是合法的 - 最后,人们可以通过订阅observable来直接使用它,但在某个时候他们将不得不等待结束(阻塞线程),因为周围的大多数代码仍然是同步的
一个基本要求是,一旦读取结束,无论以何种方式消耗可观察数据,都应立即处理数据读取器。
在所有单元测试中,有4个失败:
SubscribeCancel
和SubscribeThrow
超时(即死锁)- CCD_ 9和CCD_
数据读取器处理验证失败是一个时间问题——当foreach
被留下时(通过异常或中断),相应的IEnumerator
会立即被处理,这最终会取消可观察到的实现所使用的取消令牌。然而,该实现在另一个线程上运行,当它注意到取消时,单元测试已经结束。在实际的应用程序中,阅读器会被正确而及时地处理掉,但它与迭代的结束并不同步。我想知道是否可以使上述IEnumerator
实例的处理等待,直到相应的IObservable
实现注意到取消并且读取器被处理。
编辑
所以DbDataReader
就是IEnumerable
,这意味着如果希望同步枚举对象,那没问题。
但是,如果我想异步执行呢?在这种情况下,我被禁止列举阅读器——这是一种阻塞操作。唯一的出路是返回一个可观测的。其他人已经用比我更好的语言讨论了这个话题,例如——http://www.interact-sw.co.uk/iangblog/2013/11/29/async-yield-return
因此,我必须返回一个IObservable
,并且不能使用ToObservable
扩展方法,因为它取决于读取器的阻塞枚举。
接下来,给定一个IObservable
,有人可能会将其转换为IEnumerable
,这是愚蠢的,因为读者已经是IEnumerable
,但仍然是可行和合法的。
编辑2
使用.NET Reflector(与VS集成)调试代码表明,流通过以下方法:
namespace System.Reactive.Threading.Tasks
{
public static class TaskObservableExtensions
{
...
private static void ToObservableDone<TResult>(Task<TResult> task, AsyncSubject<TResult> subject)
{
switch (task.get_Status())
{
case TaskStatus.RanToCompletion:
subject.OnNext(task.get_Result());
subject.OnCompleted();
return;
case TaskStatus.Canceled:
subject.OnError((Exception) new TaskCanceledException((Task) task));
return;
case TaskStatus.Faulted:
subject.OnError(task.get_Exception().get_InnerException());
return;
}
}
}
}
在异步订阅中,取消令牌和从OnNext
抛出都会进入该方法(以及成功完成)。消去和抛出都收敛于CCD_ 23方法。该方法应该最终委托给OnError
处理程序。但事实并非如此。
编辑3
以下是为什么从给定订阅者抛出时从未调用OnError回调?我现在想知道什么是满足以下目标的正确方法:
- 通过异步读取
SqlDataReader
实例公开可用对象 - 避免对象物化。具体化的选择应该掌握在API的调用方手中
- API应该可以在异步代码与同步代码混合的环境中使用。为什么?因为我们已经有了一个使用同步IO的服务器,我们希望用异步IO逐步淘汰同步阻塞IO
有了这些目标,我想出了这样的东西(参见单元测试代码):
private static IObservable<int> GetObservable(DbDataReader reader)
{
return Observable.Create<int>(async (obs, cancellationToken) =>
{
using (reader)
{
while (!cancellationToken.IsCancellationRequested && await reader.ReadAsync(cancellationToken))
{
obs.OnNext((int)reader[0]);
}
}
});
}
这对你来说有意义吗?如果没有,有什么替代方案?
接下来,我想使用它,如Subscribe
单元测试代码所示。然而,SubcribeCancel
和SubscribeThrow
的结果表明这种使用模式是错误的。为什么从给定订阅者抛出时从未调用OnError回调?解释为什么它是错误的。
那么,什么是正确的方法呢?如何防止API的消费者不正确地消费(SubcribeCancel
和SubscribeThrow
就是这种不正确消费的例子)。
订阅取消
由于cts
取消,SubscribeCancel
失败。这不会调用OnError
处理程序。
取消您的cts
等同于处理您的订阅。处理订阅会导致忽略所有未来的OnNext
、OnError
和OnCompleted
调用。因此,任务永远不会完成,测试永远挂起。
解决方案:
取消cts时,请将Task设置为其正确状态。
SubscribeThrow
SubscribeThrow
由于OnNext
处理程序中的异常而失败。
在OnNext
处理程序中引发异常不会将异常转发到OnError
处理程序。
解决方案:
不要在Subscribe
处理程序中抛出异常。相反,处理您的订阅并将Task
设置为其正确状态。
ToEnumerableForEachThrow&ToEnumerableForEachBreak
CCD_ 44和CCD_。
枚举对象上的foreach(...)
将调用底层可观察对象上的dispose,这将取消取消令牌。在那之后,异常被测试的catch捕获(或者break只是退出foreach),在那里测试以查看底层读取器是否被处理。。。除了阅读器还没有被处理,因为可观察到的东西仍在等待阅读器产生下一个结果。。。只有在读取器让步(以及可观察的让步)之后,可观察的循环才会返回并检查取消令牌。在这一点上,可观察到的中断并退出使用块,并处理读取器。
解决方案:
从Observable.Create
返回一个Disposable
,而不是您的using (...)
语句。可丢弃的将在处理订阅时进行处理。这就是你想要的。一起去掉using
语句,让Rx
来完成它的工作。