链接的数据流块如何取消目标块



在TPL数据流中,当一个块通过传播链接到另一个块时,它将转发异常和取消。我可以想象,转发异常只是通过使用dataFlowBlock.Fault(exception)来完成的,但我很好奇取消是如何转发的,因为没有dataFlowBlock.Cancel()这样的东西。是否通过相同的Fault()方法使用传递TaskCancelledException作为参数来完成?

更新:

为了澄清,请考虑以下示例,其中只有块1通过选项使用CancelationToken创建,而块2没有。区块1通过传播链接到区块2:

block1 { CancellationToken = ct } -> block2 { }

当ct接收到取消请求时,块1完成转换为取消。我的问题是block2在这一点上会发生什么?block1是否主动取消block2?如果是,是否使用block.Fault(TaskCanceledException)?或者它使用了一些内部焦点pocus,即使它是在没有取消令牌的情况下创建的,也能神奇地取消block2?

好吧,我想我们对更新后的帖子是一致的。简而言之,如果传播不是真的,则取消不会流到链接块上。

[TestFixture]
public class DataFlowTests
{
[Test]
public async Task DataflowTest()
{
var cts = new CancellationTokenSource();
var buffer = new BufferBlock<int>(new DataflowBlockOptions() { BoundedCapacity = 200, CancellationToken = cts.Token });
var action = new ActionBlock<int>(x => Task.Delay(100), new ExecutionDataflowBlockOptions() { BoundedCapacity = 5 });
buffer.LinkTo(action, new DataflowLinkOptions() { PropagateCompletion = false});
foreach (var data in Enumerable.Range(0, 20))
{
if (data > 10) break;
await buffer.SendAsync(data);
}
cts.Cancel();
//action.Complete();
await action.Completion;
Console.WriteLine(buffer.Completion.Status);
Console.WriteLine(action.Completion.Status);
}
}

这个样品将永远挂在那里等待action的完成。现在在ActionBlock<>上显式调用Complete()会产生以下结果:

Cancelled - buffer
RanToCompletion - action

最后传播完成产生相同的结果,而不必手动调用下游块上的完成:

[TestFixture]
public class DataFlowTests
{
[Test]
public async Task DataflowTest()
{
var cts = new CancellationTokenSource();
var buffer = new BufferBlock<int>(new DataflowBlockOptions() { BoundedCapacity = 200, CancellationToken = cts.Token });
var action = new ActionBlock<int>(x => Task.Delay(100), new ExecutionDataflowBlockOptions() { BoundedCapacity = 5 });
buffer.LinkTo(action, new DataflowLinkOptions() { PropagateCompletion = true});
foreach (var data in Enumerable.Range(0, 20))
{
if (data > 10) break;
await buffer.SendAsync(data);
}
cts.Cancel();
await action.Completion;
Console.WriteLine(buffer.Completion.Status);
Console.WriteLine(action.Completion.Status);
}
}

收益率状态:

Canceled - buffer
RanToCompletion - action

请注意,取消不会导致整个管道被取消。其他区块简单完成。现在,如果出现除取消通知之外的异常,则管道将通过...Fault(..)出现故障。否则它会突出标准完成传播。

取消以选项CancellationToken的形式出现在ExecutionDataflowBlockOptions上。传递的令牌将成功完成块。这意味着,这些块将以不同于管道中抛出的其他异常的方式处理OperationCanceledException,并导致这些块只完成一次处理。取消旨在通过共享单个CTS并将相关联的令牌使用到块选项中来工作。然后,当CTS被取消时,所有块都获得取消信号。

也可以在这里找到更多来自MS

在评论中指出你的实际问题,这可能会解释得更多。

当显式取消数据流块时,AggregateException对象在InnerException属性中包含OperationCanceledException

还有:

TPL提供了一种机制,使任务能够以协作的方式协调取消。要使数据流块能够参与此取消机制,请设置CancellationToken属性。当此CancellationToken对象设置为已取消状态时,监视此令牌的所有数据流块都会完成其当前项的执行,但不会开始处理后续项。这些数据流块还清除任何缓冲的消息,释放到任何源块和目标块的连接,并转换到取消状态。通过转换到已取消状态,"完成"属性的"状态"属性设置为"已取消",除非在处理过程中发生异常。在这种情况下,Status设置为Faulted。

这是大量引用的文本,但提供了源链接,解释写得很好。

因此,这些块不会主动传播取消。但是,一旦取消的块完成,在传播为true的情况下,将使完成任务与取消或故障状态一起流动。

无论何时块将其完成传播到链接块,都可以在此处或此处找到运行的源代码。以下是该代码的简化版本:

internal static void PropagateCompletion(Task sourceCompletionTask, IDataflowBlock target)
{
AggregateException exception =
sourceCompletionTask.IsFaulted ? sourceCompletionTask.Exception : null;
if (exception != null) target.Fault(exception); else target.Complete();
}

没有关于传播取消的规定。如果块在故障状态下完成,则异常会传播到链接块。在任何其他情况下,链接块只是标记为完成。

AFAIK块转换到取消状态的唯一情况是在创建时提供的CancellationToken被取消时。例如,如果您尝试从其lambda函数内部调用throw new OperationCanceledException(),则不会发生任何事情,处理后的消息将被忽略。

最新更新