将动态创建的ActionBlock链接到BufferBlock



我不确定这是否可能,但如果可能,我可能做得不对。假设我有一个共享缓冲区,它链接到许多消费者(ActionBlocks)。每个使用者都应该使用满足用于将其链接到缓冲区的谓词的数据。例如,ActionBlock1应该消耗满足x => x % 5 == 0的数字,ActionBlock2应该只消耗x => x % 5 == 1等。

这是我得到的:

private static ITargetBlock<int> BuildPipeline(int NumProductionLines)
{
var productionQueue = new BufferBlock<int>();
for (int i = 0; i < NumProductionLines; i++)
{
ActionBlock<int> productionLine = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", i + 1, num));
productionQueue.LinkTo(productionLine, x => x % NumProductionLines == i);
}
return productionQueue;
}

然后我打电话给

Random rnd = new Random();
ITargetBlock<int> temp = BuildPipeline(5);
while (true)
{
temp.Post(rnd.Next(255));
}

然而,这并不奏效。控制台中不显示任何输出。如果我将BuildPipeline方法修改为:

private static ITargetBlock<int> BuildPipeline(int NumProductionLines)
{
var productionQueue = new BufferBlock<int>();
ActionBlock<int> productionLine1 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 1, num));
ActionBlock<int> productionLine2 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 2, num));
ActionBlock<int> productionLine3 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 3, num));
ActionBlock<int> productionLine4 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 4, num));
ActionBlock<int> productionLine5 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 5, num));
productionQueue.LinkTo(productionLine1, x => x % 5 == 0);
productionQueue.LinkTo(productionLine2, x => x % 5 == 1);
productionQueue.LinkTo(productionLine3, x => x % 5 == 2);
productionQueue.LinkTo(productionLine4, x => x % 5 == 3);
productionQueue.LinkTo(productionLine5, x => x % 5 == 4);
return productionQueue;
}

代码执行预期的操作。

有人能解释为什么动态创建和链接动作块不起作用吗?

附言:如果我在ITargetBlock<int> temp = BuildPipeline(5);之后立即进入代码,那么temp确实显示有5个目标链接到缓冲区。每个目标的Id不同。

提前感谢

编辑:添加了svick建议的更改,但仍然不好:

private static ITargetBlock<int> BuildPipeline(int NumProductionLines)
{
var productionQueue = new BufferBlock<int>();
var opt = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 };
for (int i = 0; i < NumProductionLines; i++)
{
ActionBlock<int> productionLine = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", i + 1, num));
int j = i;
productionQueue.LinkTo(productionLine, x => x % NumProductionLines == j);
}
ActionBlock<int> discardedLine = new ActionBlock<int>(num => Console.WriteLine("Discarded: {0}", num));
productionQueue.LinkTo(discardedLine);
return productionQueue;
}

现在只有第二条生产线处理数据(满足x%5==1谓词的生产线)。数据不满足谓词,这意味着我得到的数字以9和7结尾。

编辑:工作代码如下所示:

private static ITargetBlock<int> BuildPipeline(int NumProductionLines)
{
var productionQueue = new BufferBlock<int>();
var opt = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 };
for (int i = 0; i < NumProductionLines; i++)
{
int j = i;
ActionBlock<int> productionLine = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", j + 1, num));
productionQueue.LinkTo(productionLine, x => x % NumProductionLines == j);
}
productionQueue.LinkTo(DataflowBlock.NullTarget<int>());
return productionQueue;
}

问题是在您的第一个版本中,您对每个目标块使用相同的谓词。换句话说,谓词不依赖于i

但是,即使是这样,您的代码也不会工作,因为i变量在谓词之间是共享的,所以它们都将使用最后一个值。解决方法是将i复制到一个局部变量中,并在谓词中使用它。

代码可能看起来像这样:

private static ITargetBlock<int> BuildPipeline(int NumProductionLines)
{
var productionQueue = new BufferBlock<int>();
for (int i = 0; i < NumProductionLines; i++)
{
int iCopy = i;
ActionBlock<int> productionLine = new ActionBlock<int>(
num => Console.WriteLine("Processed by line {0}: {1}", iCopy + 1, num));
productionQueue.LinkTo(
productionLine, x => x % NumProductionLines == iCopy);
}
return productionQueue;
}

如果你问为什么你的代码不至少处理x % 5 == 1数字,那是因为随机生成器可能会生成一个与该谓词不匹配的数字,所以没有一个ActionBlock会接受它。正因为如此,这个数字将留在源块的输出队列中,其他数字将无法通过。

如果在你的真实代码中,类似的情况可能会发生,并且你想丢弃所有不符合任何谓词的数字,那么在你将源块链接到所有有用的块之后,你可以将它链接到一个什么都不做、什么都接受的块:

productionQueue.LinkTo(DataflowBlock.NullTarget<int>());

最新更新