我正在尝试使用数据流块,我需要监视通过的项目以进行单元测试。
为了做到这一点,我在我的TransformBlock<Tinput, T>
的ISourceBlock<T>
上使用AsObservable()
方法,这样我就可以在执行后检查我的管道的每个块是否都生成了预期的值。
管道
{
...
var observer = new MyObserver<string>();
_block = new TransformManyBlock<string, string>(MyHandler, options);
_block.LinkTo(_nextBlock);
_block.AsObservable().Subscribe(observer);
_block.Post("Test");
...
}
MyObserver
public class MyObserver<T> : IObserver<T>
{
public List<Exception> Errors = new List<Exception>();
public bool IsComplete = false;
public List<T> Values = new List<T>();
public void OnCompleted()
{
IsComplete = true;
}
public void OnNext(T value)
{
Values.Add(value);
}
public void OnError(Exception e)
{
Errors.Add(e);
}
}
所以基本上,我把我的观察者订阅到transformblock,我希望通过的每个值都在我的观察器中注册;值";列表
但是,当IsComplete
被设置为true并且OnError()
成功注册异常时,CCD_ 6方法永远不会被调用,除非它是管道的最后一个块。。。我不明白为什么,因为;下一块";链接到此源块成功接收数据,证明某些数据正在退出该块。
据我所知,AsObservable
应该报告退出块的每个值,而不仅仅是未被其他链接块消耗的值。。。
我做错了什么?
在您有机会阅读消息之前,您的消息已被_nextBlock
消耗。
如果你把_block.LinkTo(_nextBlock);
这行注释掉,它可能会起作用。
AsObservable
的唯一目的只是允许块从RX中被消耗。它不会将块的内部工作更改为向多个目标广播消息。你需要为BroadcastBlock
提供一个特殊的块
我建议将广播到另一个块,并将其用于Subscribe
BroadcastBlock的使命是启用从获取已发布的每个元素的副本的块
var options = new DataflowLinkOptions {PropagateCompletion = true};
var broadcastBlock = new BroadcastBlock<string>(x => x);
var bufferBlock = new BufferBlock<string>();
var actionBlock = new ActionBlock<string>(s => Console.WriteLine("Action " + s));
broadcastBlock.LinkTo(bufferBlock, options);
broadcastBlock.LinkTo(actionBlock, options);
bufferBlock.AsObservable().Subscribe(s => Console.WriteLine("peek " + s));
for (var i = 0; i < 5; i++)
await broadcastBlock.SendAsync(i.ToString());
broadcastBlock.Complete();
await actionBlock.Completion;
输出
peek 0
Action 0
Action 1
Action 2
Action 3
Action 4
peek 1
peek 2
peek 3
peek 4