异步任务,视频缓冲



我试图理解C#中的任务,但仍然存在一些问题。我正在尝试创建一个包含视频的应用程序。主要目的是从文件中读取视频(我使用的是Emgu.CV(,并通过TCP/IP将其发送到板中进行处理,然后以流(实时(方式返回。首先,我是连续做的。因此,读取Bitmap,从板上发送接收,并绘制。但是读取位图并绘制它们需要花费太多时间。我想有一个传输、接收FIFO缓冲区来保存视频帧,还有一个不同的任务来完成发送和接收每一帧的工作。所以我想同时进行。我想我应该创建3个任务:

tasks.Add(Task.Run(() => Video_load(video_path)));
tasks.Add(Task.Run(() => Video_Send_Recv(video_path)));
tasks.Add(Task.Run(() => VideoDisp_hw(32)));

我想运行";平行";。我应该使用什么类型的对象?并发队列?BufferBlock?还是只是一份清单?

谢谢你的建议!我想问个问题。我正在尝试创建一个简单的控制台程序,其中包含2个TPL块。1块将是Transform块(获取消息,即"开始"(并将数据加载到List,另一块将是ActionBlock(只是从列表中读取数据并打印它们(。下面是代码:

namespace TPL_Dataflow
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Hello World!");
Random randn = new Random();
var loadData = new TransformBlock<string, List<int>>(async sample_string =>
{
List<int> input_data = new List<int>();
int cnt = 0;
if (sample_string == "start")
{
Console.WriteLine("Inside loadData");
while (cnt < 16)
{
input_data.Add(randn.Next(1, 255));
await Task.Delay(1500);
Console.WriteLine("Cnt");
cnt++;
}
}
else
{
Console.WriteLine("Not started yet");
}
return input_data;
});

var PrintData = new ActionBlock<List<int>>(async input_data =>
{
while(input_data.Count > 0)
{

Console.WriteLine("output Data = " + input_data.First());
await Task.Delay(1000);
input_data.RemoveAt(0);

}

});
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
loadData.LinkTo(PrintData, input_data => input_data.Count() >0  );
//loadData.LinkTo(PrintData, linkOptions);

loadData.SendAsync("start");
loadData.Complete();
PrintData.Completion.Wait();
}
}
}

但它似乎是以串行方式工作的。。我做错了什么?我试着做while循环异步。我想同时做这两件事。当列表中的数据可用时进行打印。

您可以使用TransformManyBlock<string, int>作为生产者块,使用ActionBlock<int>作为消费者块。TransformManyBlock将使用接受Func<string, IEnumerable<int>>委托的构造函数进行实例化,并传递迭代器方法(下面示例中的Produce方法(,该方法逐个生成值:

Random random = new Random();
var producer = new TransformManyBlock<string, int>(Produce);
IEnumerable<int> Produce(string message)
{
if (message == "start")
{
int cnt = 0;
while (cnt < 16)
{
int value;
lock (random) value = random.Next(1, 255);
Console.WriteLine($"Producing #{value}");
yield return value;
Thread.Sleep(1500);
cnt++;
}
}
else
{
yield break;
}
}
var consumer = new ActionBlock<int>(async value =>
{
Console.WriteLine($"Received: {value}");
await Task.Delay(1000);
});
producer.LinkTo(consumer, new() { PropagateCompletion = true });
producer.Post("start");
producer.Complete();
consumer.Completion.Wait();

不幸的是,生产者不得不在产生每个值(Thread.Sleep(1500);(之间的空闲时间段阻塞工作线程,因为TransformManyBlock当前没有接受Func<string, IAsyncEnumerable<int>>的构造函数。这可能会在TPL数据流库的下一个版本中得到修复。您可以跟踪此GitHub问题,以便了解此功能何时发布。


替代解决方案:您可以保持生产者和消费者的非链接,并手动将生产者产生的值发送给消费者,而不是显式地链接生产者和消费者。在这种情况下,两个块都将是ActionBlocks:

Random random = new Random();
var consumer = new ActionBlock<int>(async value =>
{
Console.WriteLine($"Received: {value}");
await Task.Delay(1000);
});
var producer = new ActionBlock<string>(async message =>
{
if (message == "start")
{
int cnt = 0;
while (cnt < 16)
{
int value;
lock (random) value = random.Next(1, 255);
Console.WriteLine($"Producing #{value}");
var accepted = await consumer.SendAsync(value);
if (!accepted) break; // The consumer has failed
await Task.Delay(1500);
cnt++;
}
}
});
PropagateCompletion(producer, consumer);
producer.Post("start");
producer.Complete();
consumer.Completion.Wait();
async void PropagateCompletion(IDataflowBlock source, IDataflowBlock target)
{
try { await source.Completion.ConfigureAwait(false); } catch { }
var ex = source.Completion.IsFaulted ? source.Completion.Exception : null;
if (ex != null) target.Fault(ex); else target.Complete();
}

这种方法的主要困难是如何将生产者的完成传播给消费者,从而最终完成两个块。显然,您不能使用new DataflowLinkOptions { PropagateCompletion = true }配置,因为块没有显式链接。您也不能手动Complete消费者,因为在这种情况下,它将停止过早地接受生产者的值。这个问题的解决方案是上述示例中所示的PropagateCompletion方法。