如何正确地用IOobservable包装SqlDataReader



我想探索使用IObservable<T>作为SqlDataReader的包装器的可能性。到目前为止,我们使用读取器来避免在内存中具体化整个结果,并且我们使用阻塞同步API来实现这一点。

现在我们想尝试将异步API和.NET反应式扩展结合使用。

然而,由于采用异步方式是一个渐进的过程,因此此代码必须与同步代码共存。

我们已经知道,这种同步和异步的混合在ASP.NET中是不起作用的,因为整个请求执行路径必须始终是异步的。一篇关于这个主题的优秀文章是http://blog.stephencleary.com/2012/07/dont-block-on-async-code.html

但我说的是一个简单的WCF服务。我们已经在那里混合了异步和同步代码,但这是我们第一次希望引入Rx,并且存在问题。

我创建了简单的单元测试(我们使用mstest,叹息:-()来演示这些问题。我希望有人能解释我发生了什么。请在下面找到整个源代码(使用Moq):

using System;
using System.Data.Common;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
namespace UnitTests
{
public static class Extensions
{
public static Task<List<T>> ToListAsync<T>(this IObservable<T> observable)
{
var res = new List<T>();
var tcs = new TaskCompletionSource<List<T>>();
observable.Subscribe(res.Add, e => tcs.TrySetException(e), () => tcs.TrySetResult(res));
return tcs.Task;
}
}
[TestClass]
public class TestRx
{
public const int UNIT_TEST_TIMEOUT = 5000;
private static DbDataReader CreateDataReader(int count = 100, int msWait = 10)
{
var curItemIndex = -1;
var mockDataReader = new Mock<DbDataReader>();
mockDataReader.Setup(o => o.ReadAsync(It.IsAny<CancellationToken>())).Returns<CancellationToken>(ct => Task.Factory.StartNew(() =>
{
Thread.Sleep(msWait);
if (curItemIndex + 1 < count && !ct.IsCancellationRequested)
{
++curItemIndex;
return true;
}
Trace.WriteLine(curItemIndex);
return false;
}));
mockDataReader.Setup(o => o[0]).Returns<int>(_ => curItemIndex);
mockDataReader.CallBase = true;
mockDataReader.Setup(o => o.Close()).Verifiable();
return mockDataReader.Object;
}
private static IObservable<int> GetObservable(DbDataReader reader)
{
return Observable.Create<int>(async (obs, cancellationToken) =>
{
using (reader)
{
while (!cancellationToken.IsCancellationRequested && await reader.ReadAsync(cancellationToken))
{
obs.OnNext((int)reader[0]);
}
}
});
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void ToListAsyncResult()
{
var reader = CreateDataReader();
var numbers = GetObservable(reader).ToListAsync().Result;
CollectionAssert.AreEqual(Enumerable.Range(0, 100).ToList(), numbers);
Mock.Get(reader).Verify(o => o.Close());
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void ToEnumerableToList()
{
var reader = CreateDataReader();
var numbers = GetObservable(reader).ToEnumerable().ToList();
CollectionAssert.AreEqual(Enumerable.Range(0, 100).ToList(), numbers);
Mock.Get(reader).Verify(o => o.Close());
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void ToEnumerableForEach()
{
var reader = CreateDataReader();
int i = 0;
foreach (var n in GetObservable(reader).ToEnumerable())
{
Assert.AreEqual(i, n);
++i;
}
Assert.AreEqual(100, i);
Mock.Get(reader).Verify(o => o.Close());
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void ToEnumerableForEachBreak()
{
var reader = CreateDataReader();
int i = 0;
foreach (var n in GetObservable(reader).ToEnumerable())
{
Assert.AreEqual(i, n);
++i;
if (i == 5)
{
break;
}
}
Mock.Get(reader).Verify(o => o.Close());
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void ToEnumerableForEachThrow()
{
var reader = CreateDataReader();
int i = 0;
try
{
foreach (var n in GetObservable(reader).ToEnumerable())
{
Assert.AreEqual(i, n);
++i;
if (i == 5)
{
throw new Exception("xo-xo");
}
}
Assert.Fail();
}
catch (Exception exc)
{
Assert.AreEqual("xo-xo", exc.Message);
Mock.Get(reader).Verify(o => o.Close());
} 
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void Subscribe()
{
var reader = CreateDataReader();
var tcs = new TaskCompletionSource<object>();
int i = 0;
GetObservable(reader).Subscribe(n =>
{
Assert.AreEqual(i, n);
++i;
}, () =>
{
Assert.AreEqual(100, i);
Mock.Get(reader).Verify(o => o.Close());
tcs.TrySetResult(null);
});
tcs.Task.Wait();
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void SubscribeCancel()
{
var reader = CreateDataReader();
var tcs = new TaskCompletionSource<object>();
var cts = new CancellationTokenSource();
int i = 0;
GetObservable(reader).Subscribe(n =>
{
Assert.AreEqual(i, n);
++i;
if (i == 5)
{
cts.Cancel();
}
}, e =>
{
Assert.IsTrue(i < 100);
Mock.Get(reader).Verify(o => o.Close());
tcs.TrySetException(e);
}, () =>
{
Assert.IsTrue(i < 100);
Mock.Get(reader).Verify(o => o.Close());
tcs.TrySetResult(null);
}, cts.Token);
tcs.Task.Wait();
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void SubscribeThrow()
{
var reader = CreateDataReader();
var tcs = new TaskCompletionSource<object>();
int i = 0;
GetObservable(reader).Subscribe(n =>
{
Assert.AreEqual(i, n);
++i;
if (i == 5)
{
throw new Exception("xo-xo");
}
}, e =>
{
Assert.AreEqual("xo-xo", e.Message);
Mock.Get(reader).Verify(o => o.Close());
tcs.TrySetResult(null);
});
tcs.Task.Wait();
}
}
}

这些单元测试捕获了API返回包装数据读取器的IObservable<T>的所有可能用途:

  • 人们可能希望使用我们的ToListAsync扩展方法或.ToEnumerable().ToList()来完全实现它
  • 人们可能希望使用ToEnumerable扩展方法对其进行迭代。是的-如果消耗很快,它会阻止,如果消耗很慢,它会在内部队列中具体化数据,但这种情况仍然是合法的
  • 最后,人们可以通过订阅observable来直接使用它,但在某个时候他们将不得不等待结束(阻塞线程),因为周围的大多数代码仍然是同步的

一个基本要求是,一旦读取结束,无论以何种方式消耗可观察数据,都应立即处理数据读取器。

在所有单元测试中,有4个失败:

  • SubscribeCancelSubscribeThrow超时(即死锁)
  • CCD_ 9和CCD_

数据读取器处理验证失败是一个时间问题——当foreach被留下时(通过异常或中断),相应的IEnumerator会立即被处理,这最终会取消可观察到的实现所使用的取消令牌。然而,该实现在另一个线程上运行,当它注意到取消时,单元测试已经结束。在实际的应用程序中,阅读器会被正确而及时地处理掉,但它与迭代的结束并不同步。我想知道是否可以使上述IEnumerator实例的处理等待,直到相应的IObservable实现注意到取消并且读取器被处理。

编辑

所以DbDataReader就是IEnumerable,这意味着如果希望同步枚举对象,那没问题。

但是,如果我想异步执行呢?在这种情况下,我被禁止列举阅读器——这是一种阻塞操作。唯一的出路是返回一个可观测的。其他人已经用比我更好的语言讨论了这个话题,例如——http://www.interact-sw.co.uk/iangblog/2013/11/29/async-yield-return

因此,我必须返回一个IObservable,并且不能使用ToObservable扩展方法,因为它取决于读取器的阻塞枚举。

接下来,给定一个IObservable,有人可能会将其转换为IEnumerable,这是愚蠢的,因为读者已经是IEnumerable,但仍然是可行和合法的。

编辑2

使用.NET Reflector(与VS集成)调试代码表明,流通过以下方法:

namespace System.Reactive.Threading.Tasks
{
public static class TaskObservableExtensions
{
...
private static void ToObservableDone<TResult>(Task<TResult> task, AsyncSubject<TResult> subject)
{
switch (task.get_Status())
{
case TaskStatus.RanToCompletion:
subject.OnNext(task.get_Result());
subject.OnCompleted();
return;
case TaskStatus.Canceled:
subject.OnError((Exception) new TaskCanceledException((Task) task));
return;
case TaskStatus.Faulted:
subject.OnError(task.get_Exception().get_InnerException());
return;
}
}
}
}

在异步订阅中,取消令牌和从OnNext抛出都会进入该方法(以及成功完成)。消去和抛出都收敛于CCD_ 23方法。该方法应该最终委托给OnError处理程序。但事实并非如此。

编辑3

以下是为什么从给定订阅者抛出时从未调用OnError回调?我现在想知道什么是满足以下目标的正确方法:

  1. 通过异步读取SqlDataReader实例公开可用对象
  2. 避免对象物化。具体化的选择应该掌握在API的调用方手中
  3. API应该可以在异步代码与同步代码混合的环境中使用。为什么?因为我们已经有了一个使用同步IO的服务器,我们希望用异步IO逐步淘汰同步阻塞IO

有了这些目标,我想出了这样的东西(参见单元测试代码):

private static IObservable<int> GetObservable(DbDataReader reader)
{
return Observable.Create<int>(async (obs, cancellationToken) =>
{
using (reader)
{
while (!cancellationToken.IsCancellationRequested && await reader.ReadAsync(cancellationToken))
{
obs.OnNext((int)reader[0]);
}
}
});
}

这对你来说有意义吗?如果没有,有什么替代方案?

接下来,我想使用它,如Subscribe单元测试代码所示。然而,SubcribeCancelSubscribeThrow的结果表明这种使用模式是错误的。为什么从给定订阅者抛出时从未调用OnError回调?解释为什么它是错误的。

那么,什么是正确的方法呢?如何防止API的消费者不正确地消费(SubcribeCancelSubscribeThrow就是这种不正确消费的例子)。

订阅取消

由于cts取消,SubscribeCancel失败。这不会调用OnError处理程序。

取消您的cts等同于处理您的订阅。处理订阅会导致忽略所有未来的OnNextOnErrorOnCompleted调用。因此,任务永远不会完成,测试永远挂起。

解决方案:

取消cts时,请将Task设置为其正确状态。

SubscribeThrow

SubscribeThrow由于OnNext处理程序中的异常而失败。

OnNext处理程序中引发异常不会将异常转发到OnError处理程序。

解决方案:

不要在Subscribe处理程序中抛出异常。相反,处理您的订阅并将Task设置为其正确状态。

ToEnumerableForEachThrow&ToEnumerableForEachBreak

CCD_ 44和CCD_。

枚举对象上的foreach(...)将调用底层可观察对象上的dispose,这将取消取消令牌。在那之后,异常被测试的catch捕获(或者break只是退出foreach),在那里测试以查看底层读取器是否被处理。。。除了阅读器还没有被处理,因为可观察到的东西仍在等待阅读器产生下一个结果。。。只有在读取器让步(以及可观察的让步)之后,可观察的循环才会返回并检查取消令牌。在这一点上,可观察到的中断并退出使用块,并处理读取器。

解决方案:

Observable.Create返回一个Disposable,而不是您的using (...)语句。可丢弃的将在处理订阅时进行处理。这就是你想要的。一起去掉using语句,让Rx来完成它的工作。

相关内容

  • 没有找到相关文章