在TPL数据流中,当一个块通过传播链接到另一个块时,它将转发异常和取消。我可以想象,转发异常只是通过使用dataFlowBlock.Fault(exception)
来完成的,但我很好奇取消是如何转发的,因为没有dataFlowBlock.Cancel()
这样的东西。是否通过相同的Fault()
方法使用传递TaskCancelledException
作为参数来完成?
更新:
为了澄清,请考虑以下示例,其中只有块1通过选项使用CancelationToken
创建,而块2没有。区块1通过传播链接到区块2:
block1 { CancellationToken = ct } -> block2 { }
当ct接收到取消请求时,块1完成转换为取消。我的问题是block2在这一点上会发生什么?block1是否主动取消block2?如果是,是否使用block.Fault(TaskCanceledException)
?或者它使用了一些内部焦点pocus,即使它是在没有取消令牌的情况下创建的,也能神奇地取消block2?
好吧,我想我们对更新后的帖子是一致的。简而言之,如果传播不是真的,则取消不会流到链接块上。
[TestFixture]
public class DataFlowTests
{
[Test]
public async Task DataflowTest()
{
var cts = new CancellationTokenSource();
var buffer = new BufferBlock<int>(new DataflowBlockOptions() { BoundedCapacity = 200, CancellationToken = cts.Token });
var action = new ActionBlock<int>(x => Task.Delay(100), new ExecutionDataflowBlockOptions() { BoundedCapacity = 5 });
buffer.LinkTo(action, new DataflowLinkOptions() { PropagateCompletion = false});
foreach (var data in Enumerable.Range(0, 20))
{
if (data > 10) break;
await buffer.SendAsync(data);
}
cts.Cancel();
//action.Complete();
await action.Completion;
Console.WriteLine(buffer.Completion.Status);
Console.WriteLine(action.Completion.Status);
}
}
这个样品将永远挂在那里等待action
的完成。现在在ActionBlock<>
上显式调用Complete()
会产生以下结果:
Cancelled - buffer
RanToCompletion - action
最后传播完成产生相同的结果,而不必手动调用下游块上的完成:
[TestFixture]
public class DataFlowTests
{
[Test]
public async Task DataflowTest()
{
var cts = new CancellationTokenSource();
var buffer = new BufferBlock<int>(new DataflowBlockOptions() { BoundedCapacity = 200, CancellationToken = cts.Token });
var action = new ActionBlock<int>(x => Task.Delay(100), new ExecutionDataflowBlockOptions() { BoundedCapacity = 5 });
buffer.LinkTo(action, new DataflowLinkOptions() { PropagateCompletion = true});
foreach (var data in Enumerable.Range(0, 20))
{
if (data > 10) break;
await buffer.SendAsync(data);
}
cts.Cancel();
await action.Completion;
Console.WriteLine(buffer.Completion.Status);
Console.WriteLine(action.Completion.Status);
}
}
收益率状态:
Canceled - buffer
RanToCompletion - action
请注意,取消不会导致整个管道被取消。其他区块简单完成。现在,如果出现除取消通知之外的异常,则管道将通过...Fault(..)
出现故障。否则它会突出标准完成传播。
取消以选项CancellationToken
的形式出现在ExecutionDataflowBlockOptions
上。传递的令牌将成功完成块。这意味着,这些块将以不同于管道中抛出的其他异常的方式处理OperationCanceledException
,并导致这些块只完成一次处理。取消旨在通过共享单个CTS并将相关联的令牌使用到块选项中来工作。然后,当CTS被取消时,所有块都获得取消信号。
也可以在这里找到更多来自MS
在评论中指出你的实际问题,这可能会解释得更多。
当显式取消数据流块时,AggregateException对象在InnerException属性中包含OperationCanceledException
还有:
TPL提供了一种机制,使任务能够以协作的方式协调取消。要使数据流块能够参与此取消机制,请设置CancellationToken属性。当此CancellationToken对象设置为已取消状态时,监视此令牌的所有数据流块都会完成其当前项的执行,但不会开始处理后续项。这些数据流块还清除任何缓冲的消息,释放到任何源块和目标块的连接,并转换到取消状态。通过转换到已取消状态,"完成"属性的"状态"属性设置为"已取消",除非在处理过程中发生异常。在这种情况下,Status设置为Faulted。
源
这是大量引用的文本,但提供了源链接,解释写得很好。
因此,这些块不会主动传播取消。但是,一旦取消的块完成,在传播为true的情况下,将使完成任务与取消或故障状态一起流动。
无论何时块将其完成传播到链接块,都可以在此处或此处找到运行的源代码。以下是该代码的简化版本:
internal static void PropagateCompletion(Task sourceCompletionTask, IDataflowBlock target)
{
AggregateException exception =
sourceCompletionTask.IsFaulted ? sourceCompletionTask.Exception : null;
if (exception != null) target.Fault(exception); else target.Complete();
}
没有关于传播取消的规定。如果块在故障状态下完成,则异常会传播到链接块。在任何其他情况下,链接块只是标记为完成。
AFAIK块转换到取消状态的唯一情况是在创建时提供的CancellationToken
被取消时。例如,如果您尝试从其lambda函数内部调用throw new OperationCanceledException()
,则不会发生任何事情,处理后的消息将被忽略。