我会提前说需要巨大的速度,并且调用ExecutePackets
非常昂贵。ExecutePackets
函数必须并行处理来自不同线程的多个包。
struct Packet {
bool responseStatus;
char data[1024];
};
struct PacketPool {
int packet_count;
Packet* packets[10];
}packet_pool;
std::mutex queue_mtx;
std::mutex request_mtx;
bool ParallelExecutePacket(Packet* p_packet) {
p_packet->responseStatus = false;
struct QueuePacket {
bool executed;
Packet* p_packet;
}queue_packet{ false, p_packet };
static std::list<std::reference_wrapper<QueuePacket>> queue;
//make queue
queue_mtx.lock();
queue.push_back(queue_packet);
queue_mtx.unlock();
request_mtx.lock();
if (!queue_packet.executed)
{
ZeroMemory(&packet_pool, sizeof(packet_pool));
//move queue to pequest_pool and clear queue
queue_mtx.lock();
auto iter = queue.begin();
while (iter != queue.end())
if (!(*iter).get().executed)
{
int current_count = packet_pool.packet_count++;
packet_pool.packets[current_count] = (*iter).get().p_packet;
(*iter).get().executed = true;
queue.erase(iter++);
}
else ++iter;
queue_mtx.unlock();
//execute packets
ExecutePackets(&packet_pool);
}
request_mtx.unlock();
return p_packet->responseStatus;
}
ParallelExecutePacket
函数可以同时从多个循环中调用。我想把数据包一批一批地处理。更准确地说,这样每个线程都可以处理整个队列。然后,ExecutePackets
的数量将减少,同时不会丢失已处理的数据包的数量。
但是,在我的具有多个线程的代码中,处理的数据包总数等于一个线程处理的数据包包数。我不明白为什么会发生这种事。
在我的测试中,我创建了几个线程,在每个线程中都在一个循环中称为ParallelExecutePacket。结果是每秒处理的请求数。
多线程:
Summ:91902 Thread 0 : 20826 Thread 1 : 40031 Thread 2 : 6057 Thread 3 : 12769 Thread 4 : 12219
单线程:
Summ:104902 Thread 0 : 104902
如果我的版本不工作,如何实现我需要的?
queue_mtx.lock();
auto iter = queue.begin();
while (iter != queue.end())
queue.erase(iter++);
queue_mtx.unlock();
一次只有一个执行线程锁定队列,从中排出所有消息,然后解锁它。即使这里有一千个执行线程可用,其中也只有一个线程能够完成任何工作。所有其他人都会被阻止。
保持queue_mtx
的时间长度必须尽可能短,它应该不超过从队列中提取一条消息、完全删除它,然后在完成所有实际工作时解锁队列所需的绝对最小值。
int current_count = packet_pool.packet_count++;
packet_pool.packets[current_count] = (*iter).get().p_packet;
这似乎就是这里所做工作的范围。目前,显示的代码享有queue_mtx
保护的好处。如果这不再受到它的保护,那么如果需要的话,线程安全必须在这里以其他方式实现(目前尚不清楚这是什么,也不清楚这里是否存在线程同步问题(。
在while循环期间,您永远不会丢弃request_mtx
。while循环包括ExecutePackets
,因此线程会阻塞所有其他线程,直到它完成执行它找到的所有任务。
还要注意的是,您实际上不会看到这种并行风格带来的任何加速。要使n
线程与此代码并行,需要让n
调用方调用ParallelExecutePacket
。如果你只让每一个单独工作,就会出现完全相同的并行性。事实上,从统计数据来看,你会发现几乎总是每个线程都只运行自己的任务。不时会出现线程争用,导致一个线程执行另一个线程的任务。当这种情况发生时,两个线程的速度都会降低到两者中较慢的一个。