UPDATE:我在下面的回答中提供了问题的原因和解决方法。
我想为一个图像处理任务实现基于生产者-消费者方法的多线程。对于我来说,Producer
线程应该抓取图像并将它们放入container
,而消费者线程应该从Container
线程提取图像。我认为我应该使用queue
来实现container
。我想使用下面的代码建议在这个SO回答。但是我对container
的实现非常困惑,并将传入的图像放入Producer
线程中。
PROBLEM:第一个consumer thread
显示的图像不包含完整的数据。并且,第二个consumer thread
从不显示任何图像。可能是由于某些竞争情况或锁情况,导致第二个线程根本无法访问队列的数据。我已经试过使用Mutex
了。
#include <vector>
#include <thread>
#include <memory>
#include <queue>
#include <opencv2/highgui.hpp>
#include <opencv2/core.hpp>
#include <opencv2/imgproc.hpp>
Mutex mu;
struct ThreadSafeContainer
{
queue<unsigned char*> safeContainer;
};
struct Producer
{
Producer(std::shared_ptr<ThreadSafeContainer> c) : container(c)
{
}
void run()
{
while(true)
{
// grab image from camera
// store image in container
Mat image(400, 400, CV_8UC3, Scalar(10, 100,180) );
unsigned char *pt_src = image.data;
mu.lock();
container->safeContainer.push(pt_src);
mu.unlock();
}
}
std::shared_ptr<ThreadSafeContainer> container;
};
struct Consumer
{
Consumer(std::shared_ptr<ThreadSafeContainer> c) : container(c)
{
}
~Consumer()
{
}
void run()
{
while(true)
{
// read next image from container
mu.lock();
if (!container->safeContainer.empty())
{
unsigned char *ptr_consumer_Image;
ptr_consumer_Image = container->safeContainer.front(); //The front of the queue contain the pointer to the image data
container->safeContainer.pop();
Mat image(400, 400, CV_8UC3);
image.data = ptr_consumer_Image;
imshow("consumer image", image);
waitKey(33);
}
mu.unlock();
}
}
std::shared_ptr<ThreadSafeContainer> container;
};
int main()
{
//Pointer object to the class containing a "container" which will help "Producer" and "Consumer" to put and take images
auto ptrObject_container = make_shared<ThreadSafeContainer>();
//Pointer object to the Producer...intialize the "container" variable of "Struct Producer" with the above created common "container"
auto ptrObject_producer = make_shared<Producer>(ptrObject_container);
//FIRST Pointer object to the Consumer...intialize the "container" variable of "Struct Consumer" with the above created common "container"
auto first_ptrObject_consumer = make_shared<Consumer>(ptrObject_container);
//SECOND Pointer object to the Consumer...intialize the "container" variable of "Struct Consumer" with the above created common "container"
auto second_ptrObject_consumer = make_shared<Consumer>(ptrObject_container);
//RUN producer thread
thread producerThread(&Producer::run, ptrObject_producer);
//RUN first thread of Consumer
thread first_consumerThread(&Consumer::run, first_ptrObject_consumer);
//RUN second thread of Consumer
thread second_consumerThread(&Consumer::run, second_ptrObject_consumer);
//JOIN all threads
producerThread.join();
first_consumerThread.join();
second_consumerThread.join();
return 0;
}
我在你原来的问题中没有看到一个实际的问题,所以我将给你我在大学课程中用于实现生产者-消费者的参考材料。
http://cs360.byu.edu/static/lectures/winter-2014/semaphores.pdf幻灯片13和17给出了生产者-消费者的好例子
我在实验室里使用了这个,我在这里发布了我的github:https://github.com/qzcx/Internet_Programming/tree/master/ThreadedMessageServer
如果你看我的服务器。您可以看到我对生产者-消费者模式的实现。
请记住,使用此模式时,您不能切换等待语句的顺序,否则可能会导致死锁。
希望对大家有帮助。
编辑:好了,下面是上面链接的代码中消费者-生产者模式的总结。生产者消费者背后的想法是有一个线程安全的方式将任务从"生产者"线程传递到"消费者"工作线程。在我的示例中,要做的工作是处理客户机请求。生产者线程(.serve())监视传入的套接字,并将连接传递给消费者线程(.handle()),以便在实际请求进入时进行处理。此模式的所有代码都可以在服务器中找到。Cc文件(在server.h中有一些声明/导入)。
为了简短起见,我省略了一些细节。一定要通读每一行,了解是怎么回事。查找我正在使用的库函数和参数的含义。我在这里给了你很多帮助,但你还有很多工作要做,以获得充分的理解。制作人:
就像我上面提到的,整个生产者线程都在.serve()函数中。它做以下事情- 初始化信号量。由于操作系统的不同,这里有两个版本。我在OS X上编程,但必须在Linux上提交代码。由于信号量与操作系统相关联,因此了解如何在特定设置中使用信号量非常重要。
- 它为客户端设置可与之通信的套接字。
- 创建消费者线程。
- 监视客户端套接字并使用生产者模式将项传递给消费者。这段代码在 下面
在.serve()函数的底部,您可以看到以下代码:
while ((client = accept(server_,(struct sockaddr *)&client_addr,&clientlen)) > 0) {
sem_wait(clients_.e); //buffer check
sem_wait(clients_.s);
clients_.q->push(client);
sem_post(clients_.s);
sem_post(clients_.n); //produce
}
首先,检查缓冲区信号量"e",以确保队列中有空间放置请求。其次,为队列获取信号量"s"。然后将任务(在本例中为客户端连接)添加到队列中。释放队列的信号量。最后,使用信号量"n"向消费者发送信号。
消费者:在.handle()方法中,您实际上只关心线程的最开始。
while(1){
sem_wait(clients_.n); //consume
sem_wait(clients_.s);
client = clients_.q->front();
clients_.q->pop();
sem_post(clients_.s);
sem_post(clients_.e); //buffer free
//Handles the client requests until they disconnect.
}
消费者的行为与生产者相似,但方式相反。首先,消费者等待生产者发出信号量"n"。请记住,由于有多个消费者,因此最终哪个消费者可能获得该信号量完全是随机的。它们会争夺这个位置,但是只有一个可以在sem_post中移动过这个点。其次,它们像生产者一样获取队列信号量。从队列中取出第一项并释放信号量。最后,它们在缓冲区信号量"e"上发出信号,表示缓冲区中现在有更多的空间。
免责声明:
我知道信号量的名字很糟糕。它们与我教授的幻灯片相匹配,因为我是在那里学的。我认为它们代表以下内容:
- e for empty:如果队列已满,这个信号量将阻止生产者向队列中推送更多的项目。
- s用于信号量:我最不喜欢的。但我的教授的风格是每个共享数据结构都有一个结构体。在本例中,"clients_"是包含所有三个信号量和队列的结构体。基本上,这个信号量的存在是为了确保没有两个线程同时接触到相同的数据结构。
- n表示队列中的项数。
好的,让它越简单越好。你将需要2个线程,互斥锁,队列和2个线程处理函数。
Header.h
static DWORD WINAPI ThreadFunc_Prod(LPVOID lpParam);
static DWORD WINAPI ThreadFunc_Con(LPVOID lpParam);
HANDLE m_hThread[2];
queue<int> m_Q;
mutex m_M;
添加所有需要的东西,这些只是你需要的核心部分
Source.cpp
DWORD dwThreadId;
m_hThread[0] = CreateThread(NULL, 0, this->ThreadFunc_Prod, this, 0, &dwThreadId);
// same for 2nd thread
DWORD WINAPI Server::ThreadFunc_Prod(LPVOID lpParam)
{
cYourClass* o = (cYourClass*) lpParam;
int nData2Q = GetData(); // this is whatever you use to get your data
m_M.lock();
m_Q.push(nData2Q);
m_M.unlock();
}
DWORD WINAPI Server::ThreadFunc_Con(LPVOID lpParam)
{
cYourClass* o = (cYourClass*) lpParam;
int res;
m_M.lock();
if (m_Q.empty())
{
// bad, no data, escape or wait or whatever, don't block context
}
else
{
res = m_Q.front();
m_Q.pop();
}
m_M.unlock();
// do you magic with res here
}
在main的末尾,不要忘记使用WaitForMultipleObjects
所有可能的例子都可以直接在MSDN中找到,所以对此有很好的评论。
第二部分:我相信header是不言自明的,所以我会给你们更多的描述。在源代码的某个地方(甚至可以在构造函数中)创建线程-如何创建线程的方式可能不同,但思想是相同的(在win - thread在posix中创建后立即运行,您必须加入)。我相信你们应该在某个地方有一个函数,它启动了你们所有的魔法,我们叫它MagicKicker()
在posix的情况下,在构造函数中创建线程并在MagicKicker()
中连接它们,在MagicKicker()
中创建线程
然后你需要声明(在头)两个函数,你的线程函数将实现ThreadFunc_Prod
和ThreadFunc_Prod
,这里重要的魔力是,你将传递引用到你的对象到这个函数(因为线程基本上是静态的),所以你可以很容易地访问共享资源,如队列,互斥锁等…这些函数实际上在做功。你实际上已经在你的代码中所有你需要的,只是使用这个作为添加例程在生产者:
int nData2Q = GetData(); // this is whatever you use to get your data
m_M.lock(); // locks mutex so nobody cant enter mutex
m_Q.push(nData2Q); // puts data from producer to share queue
m_M.unlock(); // unlock mutex so u can access mutex in your consumer
并将此添加到您的消费者:
int res;
m_M.lock(); // locks mutex so u cant access anything wrapped by mutex in producer
if (m_Q.empty()) // check if there is something in queue
{
// nothing in you queue yet OR already
// skip this thread run, you can i.e. sleep for some time to build queue
Sleep(100);
continue; // in case of while wrap
return; // in case that u r running some framework with threadloop
}
else // there is actually something
{
res = m_Q.front(); // get oldest element of queue
m_Q.pop(); // delete this element from queue
}
m_M.unlock(); // unlock mutex so producer can add new items to queue
// do you magic with res here
我的问题中提到的问题是Consumer thread
显示的图像不包含完整的数据。Consumer thread
显示的图像中有几个补丁,可能无法获得Producer thread
生成的全部数据。
ANSWER背后的原因是Consumer thread
的while loop
里面声明了Mat image
。在while loop
中创建的Mat
实例一旦第二轮while loop
开始就会被删除,因此Producer thread
永远无法访问在Consumer thread
中创建的Mat image
的数据。
解决方案:我应该这样做的
struct ThreadSafeContainer
{
queue<Mat> safeContainer;
};
struct Producer
{
Producer(std::shared_ptr<ThreadSafeContainer> c) : container(c)
{
}
void run()
{
while(true)
{
// grab image from camera
// store image in container
Mat image(400, 400, CV_8UC3, Scalar(10, 100,180) );
mu.lock();
container->safeContainer.push(Mat);
mu.unlock();
}
}
std::shared_ptr<ThreadSafeContainer> container;
};
struct Consumer
{
Consumer(std::shared_ptr<ThreadSafeContainer> c) : container(c)
{
}
~Consumer()
{
}
void run()
{
while(true)
{
// read next image from container
mu.lock();
if (!container->safeContainer.empty())
{
Mat image= container->safeContainer.front(); //The front of the queue contain the image
container->safeContainer.pop();
imshow("consumer image", image);
waitKey(33);
}
mu.unlock();
}
}
std::shared_ptr<ThreadSafeContainer> container;
};