使用Unbounded Parallelism多次运行同一ActionBlock



我正在学习TPL数据流,我通过我的一些朋友看到了它的强大功能,我的实现遇到了问题。

我想要/需要的是尽可能快地发送信息。我正在LinqPad中做一些原型设计,这就是我迄今为止所拥有的:

// Holds all the messages for my loadMessage ActionBlock to grab its data from
var bufferBlock = new BufferBlock<string>();
// Sends message to where it needs to go as fast as it can.
var loadMessage = new ActionBlock<string>(msg => 
{
msg.Dump();
},  
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
// Links the blocks together
bufferBlock.LinkTo(loadMessage);
// Loads the Buffer
for (int i = 0; i < 10; i++)
{
bufferBlock.Post("This is a message");
}
//Calls completion to stop threads
bufferBlock.Complete();
loadMessage.Complete(); 

问题是loadMessageBlock没有在上面的例子中转储消息。我一直在寻找一些见解,但运气不佳。我想我错过了TPL的一些基本要素。我的理解是,BufferBlock保存要由其他块处理的信息,ActionBlocked(链接到BufferBlock)应该从缓冲区中获取数据并做它需要做的事情。在将信息放在缓冲区上的For循环停止完成后,调用它来停止线程。

在我的实现中,我有一个Parallel。因为它可以很好地运行loadMessage中的代码。我只是无法实现TPL来做我想做的事情,我的理解是TPL将比Parallel.For更快。

我认为这是怎么回事吗?我是不是用错了TPL?我将继续研究答案,任何建议都将不胜感激。谢谢

首先,请注意术语:TPL(任务并行库的缩写)与TPL数据流不同,它只是一个子集。TPL作为一个整体包括CCD_ 1和CCD_。

现在,您的代码的问题是您过早地完成了loadMessage块。呼叫Complete()后,块将不再接受任何消息,因此您发布到bufferBlock的消息将永远不会到达loadMessage

你需要的是只有在bufferBlock向它发送所有消息后才能完成loadMessage。这正是PropagateCompletion所做的:

bufferBlock.LinkTo(
loadMessage, new DataflowLinkOptions { PropagateCompletion = true });
// post your data to bufferBlock here
bufferBlock.Complete();
await loadMessage.Completion;

此外,在这种特定情况下,根本不需要Parallel.For()0,您可以直接将消息发布到loadMessage

我的理解是TPL将比Parallel更快。对于

我不明白为什么总的来说应该更快。在正常情况下,他们的表现应该具有可比性。所以你应该使用更适合你问题的,而不是因为"更快"而选择一个。如果你真的那么关心性能,那么就用两种方式编写代码,并衡量哪一种更好。

  1. 我想要/需要的是尽可能快地发送消息:

    要实现这一点,您需要同时向缓冲区块发布数据和从缓冲区块接收数据。以下是片段:

    var bufferBlock = new BufferBlock<string>();
    // Write to and read from the message block concurrently. 
    var post01 = Task.Run(() =>
    {
    // Loads the Buffer
    for (int i = 0; i < 10; i++)
    {
    bufferBlock.Post(string.Format("This is a message {0}",i));
    }
    });
    var receive = Task.Run(() =>
    {
    for (int i = 0; i < 10; i++)
    {
    var message = bufferBlock.Receive();
    message.Dump();
    }
    });
    
    Task.WaitAll(post01, receive);
    

    你可以在MSDN链接上看到更多关于这方面的信息

  2. 我的理解是,TPL将比Parallel.For.更快

    这是不正确的,因为它们使用相同的底层结构。它们属于同一名称空间System.Threading.Tasks

最新更新