使用BlockingCollection通过生产者-消费者模式保存图像



我面临一个生产者-消费者问题:我有一台相机,它可以非常快速地发送图像,我必须将它们保存到磁盘上。图像的形式为CCD_ 1。摄影机始终覆盖类型为ushort[]的相同变量。因此,在一次采集和另一次采集之间,我必须复制阵列,并在可能的情况下保存它,以释放图像的内存。重要的是不要丢失相机中的任何图像,即使这意味着要增加使用的内存:消费者(在释放内存的情况下保存图像(比生产者慢是完全可以接受的;但是,不能及时将图像复制到内存中

我已经编写了应该模拟问题的示例代码:

  • immage_ushort:是相机生成的图像,必须在下一个图像到达之前复制到BlockingCollection
  • producerTask:具有一个周期,该周期应模拟每次等待图像的到达;在这段时间内,制作者应该在BlockingCollection中复制图像
  • consumerTask:必须通过将图像保存到磁盘来处理BlockingCollection,从而释放内存;消费者的工作速度是否比生产者慢并不重要

我放了一个1毫秒的time_wait来测试性能(实际上相机无法达到这个速度(。如果代码中没有保存到磁盘(注释image1.ImWrite (file_name)(,则遵守时间(最大延迟为1-2ms,因此可以接受(。但由于保存到磁盘上,我反而会遇到有时超过100毫秒的延迟。

这是我的代码:

private void Execute_test_producer_consumer1()
{
//Images are stored as ushort array, so we create a BlockingCollection<ushort[]>
//to keep images when they arrive from camera
BlockingCollection<ushort[]> imglist = new BlockingCollection<ushort[]>();
string lod_date = "";
/*producerTask simulates a camera that returns an image every time_wait
milliseconds. The image is copied and inserted in the BlockingCollection 
to be then saved on disk in the consumerTask*/
Task producerTask = Task.Factory.StartNew(() =>
{
//Number of images to process
int num_img = 3000;
//Time between one image and the next
long time_wait = 1;
//Time log variables
var watch1 = System.Diagnostics.Stopwatch.StartNew();
long watch_log = 0;
long delta_time = 0;
long timer1 = 0;
List<long> timer_delta_log = new List<long>();
List<long> timer_delta_log_time = new List<long>();
int ii = 0;
Console.WriteLine("-----START producer");
watch1.Restart();
//Here I expect every wait_time (or a little more) an image will be inserted
//into imglist
while (ii < num_img)
{
timer1 = watch1.ElapsedMilliseconds;
delta_time = timer1 - watch_log;
if (delta_time >= time_wait || ii == 0)
{
//Add image
imglist.Add((ushort[])immage_ushort.Clone());
//Inserting data for time log
timer_delta_log.Add(delta_time);
timer_delta_log_time.Add(timer1);
watch_log = timer1;
ii++;
}
}
imglist.CompleteAdding();
watch1.Stop();
lod_date = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff");
Console.WriteLine("-----END producer: " + lod_date);
// We only print images that are not inserted on schedule
int gg = 0;
foreach (long timer_delta_log_t in timer_delta_log)
{
if (timer_delta_log_t > time_wait)
{
Console.WriteLine("-- Image " + (gg + 1) + ", delta: "
+ timer_delta_log_t + ", time: " + timer_delta_log_time[gg]);
}
gg++;
}
});
Task consumerTask = Task.Factory.StartNew(() =>
{
string file_name = "";
int yy = 0;
// saving images and removing data
foreach (ushort[] imm in imglist.GetConsumingEnumerable())
{
file_name = @"output/" + yy + ".png";
Mat image1 = new Mat(row, col, MatType.CV_16UC1, imm);
//By commenting on this line, the timing of the producer is respected
image1.ImWrite(file_name);
image1.Dispose();
yy++;
}
imglist.Dispose();
lod_date = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff");
Console.WriteLine("-----END consumer: " + lod_date);
});
}

我还认为,BlockingCollection可以在foreach的整个持续时间内保持阻塞,从而将映像保存到磁盘。所以我也试着用这个替换ushort[]0:

while(!imglist.IsCompleted)
{
ushort[] elem = imglist.Take();
file_name = @"output/" + yy + ".png";
Mat image1 = new Mat(row, col, MatType.CV_16UC1, elem);
//By commenting on this line, the timing of the producer is respected
image1.ImWrite(file_name);
image1.Dispose();
yy++;
}

但结果并没有改变。

我做错了什么?

您可能希望使用;LongRunning;选项:

LongRunning指定任务将是一个长时间运行的粗粒度操作,与细粒度系统相比,该操作涉及更少、更大的组件。它向TaskScheduler提供了一个提示,即可能会保证超额订阅。超额订阅使您可以创建比可用硬件线程数更多的线程它还向任务调度程序提供了一个提示,即任务可能需要一个额外的线程,这样它就不会阻止本地线程池队列上其他线程或工作项的前进

最新更新