线程池.h
#ifndef THREADPOOL
#define THREADPOOL
#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
#include <list>
#include <condition_variable>
template <typename T>
class Threadpool
{
public:
static int nums;
Threadpool() : _threadSize(8), _queueMaxsize(100)
{
}
Threadpool(int threadSize, int queueMaxsize) : _threadSize(threadSize), _queueMaxsize(queueMaxsize), m_stop(false)
{
for(int i=0; i<_threadSize; ++i)
{
threads.push_back(std::thread(worker,this,i));
}
for (int i = 0; i < _threadSize; ++i)
{
threads[i].detach();
}
}
bool append(T * request)
{
if (workQueue.size() > _queueMaxsize)
{
return 0;
}
std::unique_lock<std::mutex> guard(queueLock);
workQueue.push_back(request);
//request->process();
guard.unlock();
m_cond.notify_one();
}
void printQueueSize()
{
std::cout<<workQueue.size()<<std::endl;
}
private:
static void worker(void * args, int threadId)
{
Threadpool *threadpool=(Threadpool*) args;
threadpool->run(threadId);
}
void run(int threadId)
{
while(!m_stop)
{
std::unique_lock<std::mutex> lk(queueLock);
m_cond.wait(lk,[this]{return !this->workQueue.empty();});
T* request = workQueue.front();
workQueue.pop_front();
std::cout<<"thread"<<threadId<<" process ";
request->process();
//std::cout<<request->_s<<std::endl;
this->nums++;
}
}
private:
int _threadSize;
int _queueMaxsize;
std::vector<std::thread> threads;
std::mutex queueLock;
std::condition_variable m_cond;
std::list<T *> workQueue;
bool m_stop;
};
template<class T>
int Threadpool<T>::nums = 0;
#endif // THREADPOOL
test.cpp
#include<iostream>
#include<string>
#include "threadpool.h"
class Request
{
public:
explicit Request(std::string s):_s(s){}
void process()
{
printf("%sn",_s.c_str());
fflush(stdout);
}
// private:
std::string _s;
};
int main()
{
Threadpool<Request> threadpool(10,100);
for(int i =0; i<100; i++)
{
std::string str = std::string("job"+std::to_string(i));
//std::cout<<str<<std::endl;
Request r1 (str);
//r1.process();
threadpool.append(&r1);
//std::this_thread::sleep_for(std::chrono::nanoseconds(50));
}
std::this_thread::sleep_for(std::chrono::seconds(3));
std::cout<< Threadpool<Request>::nums<<std::endl;
// std::cout<<"fuck"<<std::endl
threadpool.printQueueSize();
return 0;
}
我想创建一个线程池并打印1->然而,结果并不是我所期望的。结果如下所示,
thread0 process job2
thread0 process job2
thread1 process job5
thread1 process job5
thread1 process job5
thread3 process job6
thread6 process job9
thread6 process job9
thread6 process job9
thread9 process job14
thread9 process job14
thread9 process job14
thread9 process job14
thread9 process job14
thread0 process job15
thread6 process job19
thread6 process job19
thread6 process job19
thread6 process job19
thread8 process job21
thread4 process job21
thread8 process job22
thread3 process job23
thread1 process job25
thread1 process job25
thread6 process job28
thread6 process job28
thread6 process job28
thread6 process job28
thread4 process job30
thread8 process job31
thread9 process job32
thread0 process job35
thread0 process job35
thread0 process job35
thread6 process job37
thread6 process job37
thread4 process job39
thread4 process job39
thread2 process job41
thread2 process job41
thread0 process job45
thread0 process job45
thread5 process job45
thread5 process job45
thread5 process job45
thread5 process job46
thread3 process job47
thread0 process job49
thread8 process job50
thread2 process job51
thread9 process job53
thread9 process job53
thread5 process job54
thread6 process job55
thread1 process job56
thread4 process job57
thread3 process job58
thread0 process job59
thread8 process job60
thread2 process job61
thread9 process job62
thread5 process job63
thread7 process job64
thread6 process job65
thread1 process job66
thread4 process job67
thread3 process job68
thread0 process job69
thread8 process job70
thread2 process job71
thread9 process job72
thread5 process job73
thread7 process job74
thread6 process job75
thread1 process job76
thread3 process job78
thread3 process job78
thread8 process job80
thread8 process job80
thread9 process job82
thread9 process job82
thread9 process job82
thread9 process job83
thread7 process job84
thread1 process job87
thread1 process job87
thread0 process job89
thread0 process job89
thread0 process job89
thread2 process job91
thread9 process job93
thread9 process job93
thread7 process job96
thread7 process job96
thread7 process job96
thread3 process job98
thread3 process job98
thread2 process job99
thread2 process job99
100
0
我已经检查了队列的大小。它是空的。是关于stdout的缓存还是我的线程池出错?
您的代码中有UB,您通过引用捕获r1
,但局部变量在下一次迭代中超出了范围,通过引用的未定义行为进行访问。
由于编译器很可能会重用r1
s的存储,因此process
中的printf("%sn",_s.c_str());
将使用循环同时写入的当前值(=>竞赛条件)。这就是为什么你会得到一些重复的值。不过你很幸运,这里没有断层。同样,所有这些都是未定义的。
我建议您将Request
对象直接存储在workQueue
中,对append
使用move语义。
无论何时处理will多线程,都要尝试使用线程清理程序-将-fsanitize=thread
添加到gcc和clang中。对于这类bug来说,它确实是一个很有价值的工具。