我正在使用线程向量来完成繁重的工作,之后我在它们上调用 join。有时,一切正常,它们会按预期连接起来。然而,在某些情况下,对我来说不禁看起来很随机,他们崩溃说矢量迭代器来自不同的容器。
这是我进行多线程处理的函数。
int FindPath(const int nStartX, const int nStartY,
const int nTargetX, const int nTargetY,
const unsigned char* pMap, const int nMapWidth, const int nMapHeight,
int* pOutBuffer, const int nOutBufferSize)
{
vector<Node> nodes(nMapWidth * nMapHeight);
priority_queue<Node*, vector<Node*>, Compare> queue;
vector<thread> threads;
getNodes(nodes, nStartX, nStartY, nTargetX, nTargetY, pMap, nMapWidth, nMapHeight);
queue.push(&nodes[getCoord(nMapWidth, nStartX, nStartY)]);
for (auto i = 0; i < thread::hardware_concurrency(); ++i)
{
threads.push_back(thread(doWork, ref(queue)));
}
for (auto& worker : threads)
{
worker.join();
}
if (nodes[getCoord(nMapWidth, nTargetX, nTargetY)].prev)
{
vector<int> path;
getPath(path, nodes[getCoord(nMapWidth, nTargetX, nTargetY)]);
for (auto i = 0; i < nOutBufferSize; ++i)
{
if (i >= path.size())
{
break;
}
else
{
pOutBuffer[i] = path[i];
}
}
return path.size();
}
else
{
return -1;
}
}
特别是这部分,是随机发生崩溃的地方。
for (auto& worker : threads)
{
worker.join();
}
void doWork(priority_queue<Node*, vector<Node*>, Compare>& queue)
{
while (true)
{
if (!queue.size())
{
unique_lock<mutex> ml(mtx);
cv.wait_until(ml, chrono::system_clock::now() + 10ms);
if (!queue.size())
{
break;
}
}
else
{
Node* node = queue.top();
queue.pop();
for (auto neighb : node->neighb)
{
if (node->distPrev + neighb.second < neighb.first->distPrev)
{
neighb.first->distPrev = node->distPrev + neighb.second;
neighb.first->prev = node;
queue.push(neighb.first);
cv.notify_one();
}
}
}
}
}
如果有帮助,我使用的是VSS 2019社区版本。
您需要同步所有对队列的读取和写入。
像这样的东西(未经测试,原因很明显(:
void doWork(priority_queue<Node*, vector<Node*>, Compare>& queue)
{
while (true)
{
Node* node = nullptr;
{
// Wait for an item to appear, or 10 ms to pass.
unique_lock<mutex> ml(mtx);
if (queue.empty())
{
// If the queue is still empty after 10ms, break out.
if (!cv.wait_for(ml, 10ms, [&queue]() { return !queue.empty(); }))
break;
}
// The queue can't be empty here.
node = queue.top();
queue.pop();
}
// Add neighbours.
for (auto neighb : node->neighb)
{
if (node->distPrev + neighb.second < neighb.first->distPrev)
{
neighb.first->distPrev = node->distPrev + neighb.second;
neighb.first->prev = node;
// Lock while adding to the queue.
unique_lock<mutex> ml(mtx);
queue.push(neighb.first);
cv.notify_one();
}
}
}
}
请注意,"等待十毫秒后队列为空"并不是确定工作已完成的非常可靠的方法。
或拆分为两个函数:
Node* fetch_next(priority_queue<Node*, vector<Node*>, Compare>& queue)
{
unique_lock<mutex> ml(mtx);
if (queue.empty())
{
if (!cv.wait_for(ml, 10ms, [&queue]() { return !queue.empty(); }))
return nullptr;
}
Node* node = queue.top();
queue.pop();
return node;
}
void doWork(priority_queue<Node*, vector<Node*>, Compare>& queue)
{
while (Node* node = fetch_next(queue))
{
for (auto neighb : node->neighb)
{
if (node->distPrev + neighb.second < neighb.first->distPrev)
{
neighb.first->distPrev = node->distPrev + neighb.second;
neighb.first->prev = node;
unique_lock<mutex> ml(mtx);
queue.push(neighb.first);
cv.notify_one();
}
}
}
}
我的猜测是:dowork(( 中的以下代码由多个线程异步执行,因此存在问题:
if (!queue.size())
在 else 块中,队列按如下方式访问,同样由多个线程异步访问:
Node* node = queue.top();
queue.pop();
在互斥锁下保护上述代码片段可能也是一个更好的主意?