并行数据包调度程序中的瓶颈



我会提前说需要巨大的速度,并且调用ExecutePackets非常昂贵。ExecutePackets函数必须并行处理来自不同线程的多个包。

struct Packet {
bool responseStatus;
char data[1024];
};
struct PacketPool {
int packet_count;
Packet* packets[10];
}packet_pool;
std::mutex queue_mtx;
std::mutex request_mtx;
bool ParallelExecutePacket(Packet* p_packet) {
p_packet->responseStatus = false;
struct QueuePacket {
bool executed;
Packet* p_packet;
}queue_packet{ false, p_packet };
static std::list<std::reference_wrapper<QueuePacket>> queue;
//make queue
queue_mtx.lock();
queue.push_back(queue_packet);
queue_mtx.unlock();
request_mtx.lock();
if (!queue_packet.executed)
{
ZeroMemory(&packet_pool, sizeof(packet_pool));

//move queue to pequest_pool and clear queue
queue_mtx.lock();
auto iter = queue.begin();
while (iter != queue.end())
if (!(*iter).get().executed)
{
int current_count = packet_pool.packet_count++;
packet_pool.packets[current_count] = (*iter).get().p_packet;
(*iter).get().executed = true;
queue.erase(iter++);
}
else ++iter;
queue_mtx.unlock();

//execute packets
ExecutePackets(&packet_pool);
}
request_mtx.unlock();
return p_packet->responseStatus;
}

ParallelExecutePacket函数可以同时从多个循环中调用。我想把数据包一批一批地处理。更准确地说,这样每个线程都可以处理整个队列。然后,ExecutePackets的数量将减少,同时不会丢失已处理的数据包的数量。

但是,在我的具有多个线程的代码中,处理的数据包总数等于一个线程处理的数据包包数。我不明白为什么会发生这种事。

在我的测试中,我创建了几个线程,在每个线程中都在一个循环中称为ParallelExecutePacket。结果是每秒处理的请求数。

多线程:

Summ:91902
Thread 0 : 20826
Thread 1 : 40031
Thread 2 : 6057
Thread 3 : 12769
Thread 4 : 12219

单线程:

Summ:104902
Thread 0 : 104902

如果我的版本不工作,如何实现我需要的?

queue_mtx.lock();
auto iter = queue.begin();
while (iter != queue.end())
queue.erase(iter++);

queue_mtx.unlock();

一次只有一个执行线程锁定队列,从中排出所有消息,然后解锁它。即使这里有一千个执行线程可用,其中也只有一个线程能够完成任何工作。所有其他人都会被阻止。

保持queue_mtx的时间长度必须尽可能短,它应该不超过从队列中提取一条消息、完全删除它,然后在完成所有实际工作时解锁队列所需的绝对最小值。

int current_count = packet_pool.packet_count++;
packet_pool.packets[current_count] = (*iter).get().p_packet;

这似乎就是这里所做工作的范围。目前,显示的代码享有queue_mtx保护的好处。如果这不再受到它的保护,那么如果需要的话,线程安全必须在这里以其他方式实现(目前尚不清楚这是什么,也不清楚这里是否存在线程同步问题(。

在while循环期间,您永远不会丢弃request_mtx。while循环包括ExecutePackets,因此线程会阻塞所有其他线程,直到它完成执行它找到的所有任务。

还要注意的是,您实际上不会看到这种并行风格带来的任何加速。要使n线程与此代码并行,需要让n调用方调用ParallelExecutePacket。如果你只让每一个单独工作,就会出现完全相同的并行性。事实上,从统计数据来看,你会发现几乎总是每个线程都只运行自己的任务。不时会出现线程争用,导致一个线程执行另一个线程的任务。当这种情况发生时,两个线程的速度都会降低到两者中较慢的一个。

相关内容

最新更新