TPL数据流仅在处理完所有数据后才传播完成



我有一个生产者向BufferBlock发送数据,当所有数据都已从源读取时,它调用Complete()

默认行为是,当调用完成时,即使缓冲区仍有消息,它也会将完成传播到管道中。

是否需要等待来告诉块:只在缓冲区为空时传播完成

当完成发生时,我在Receive:InvalidOperationException: 'The source completed without providing data to receive.'上得到一个异常

我目前正在使用:

var bufferBlock = new BufferBlock<string>();
var transformBlock = new TransformBlock<string, string>(s =>
{
Thread.Sleep(50);
return s;
});
bufferBlock.LinkTo(transformBlock, new DataflowLinkOptions { PropagateCompletion = true });
foreach (var i in Enumerable.Range(0, 10))
bufferBlock.Post(i.ToString());
bufferBlock.Complete();
while (!transformBlock.Completion.IsCompleted)
Console.WriteLine(transformBlock.Receive());

为了避免它,我目前正在使用:

while (bufferBlock.Count > 0)
await Task.Delay(100);
bufferBlock.Complete();

这听起来并不是一个真正干净的解决方案。

这是比赛条件吗?I.E.标记为未完成的阻塞,以及在我呼叫时完成的阻塞接收?

我想我可以用block.OutputAvailableAsync代替!transformBlock.Completion.IsCompleted,对吗?

要等待管道的完成,您应该等待管道中最后一个块的完成任务。在这种情况下,您应该将代码更改为:

foreach (var i in Enumerable.Range(0, 10))
bufferBlock.Post(i.ToString());
bufferBlock.Complete();
await transformBlock.Completion;

这在完成管道和等待管道完成演练:创建数据流管道的段落中得到了演示

TransformBlock已经有了一个缓冲区,这意味着任何发布到输入BufferBlock的内容都将立即发送到TransformBlock。最好使用不同的块进行测试。演练展示了一个很好的例子:一个transformBlock用于下载页面内容,另一个用于解析页面内容等。

只是要小心各种。。。。不幸的编码实践,比如每次都创建一个新的HttpClient实例。下载程序可以更改为:

var httpClient=new HttpClient();
var downloadString = new TransformBlock<string, string>(async uri =>
{
Console.WriteLine("Downloading '{0}'...", uri);
return await httpClient.GetStringAsync(uri);
});

是的,手动从块中检索消息的正确方法是使用OutputAvailableAsync方法,并结合TryReceive:

while (await transformBlock.OutputAvailableAsync())
{
while (transformBlock.TryReceive(out var item))
{
Console.WriteLine(item);
}
}
await transformBlock.Completion; // Required to propagate exceptions

属性BufferBlock.CountTransformBlock.OutputCount等仅适用于监视和统计。在大多数情况下,使用它们来控制数据流是可能的竞争条件和潜在错误的指示。

最新更新