C++线程的向量在连接时随机崩溃



我正在使用线程向量来完成繁重的工作,之后我在它们上调用 join。有时,一切正常,它们会按预期连接起来。然而,在某些情况下,对我来说不禁看起来很随机,他们崩溃说矢量迭代器来自不同的容器。

这是我进行多线程处理的函数。

int FindPath(const int nStartX, const int nStartY,
    const int nTargetX, const int nTargetY,
    const unsigned char* pMap, const int nMapWidth, const int nMapHeight,
    int* pOutBuffer, const int nOutBufferSize)
{
    vector<Node> nodes(nMapWidth * nMapHeight);
    priority_queue<Node*, vector<Node*>, Compare> queue;
    vector<thread> threads;
    getNodes(nodes, nStartX, nStartY, nTargetX, nTargetY, pMap, nMapWidth, nMapHeight);
    queue.push(&nodes[getCoord(nMapWidth, nStartX, nStartY)]);
    for (auto i = 0; i < thread::hardware_concurrency(); ++i)
    {
        threads.push_back(thread(doWork, ref(queue)));
    }
    for (auto& worker : threads)
    {
        worker.join();
    }
    if (nodes[getCoord(nMapWidth, nTargetX, nTargetY)].prev)
    {
        vector<int> path;
        getPath(path, nodes[getCoord(nMapWidth, nTargetX, nTargetY)]);
        for (auto i = 0; i < nOutBufferSize; ++i)
        {
            if (i >= path.size())
            {
                break;
            }
            else
            {
                pOutBuffer[i] = path[i];
            }
        }
        return path.size();
    }
    else
    {
        return -1;
    }
}

特别是这部分,是随机发生崩溃的地方。

for (auto& worker : threads)
{
    worker.join();
}
void doWork(priority_queue<Node*, vector<Node*>, Compare>& queue)
{
    while (true)
    {
        if (!queue.size())
        {
            unique_lock<mutex> ml(mtx);
            cv.wait_until(ml, chrono::system_clock::now() + 10ms);
            if (!queue.size())
            {
                break;
            }
        }
        else
        {
            Node* node = queue.top();
            queue.pop();
            for (auto neighb : node->neighb)
            {
                if (node->distPrev + neighb.second < neighb.first->distPrev)
                {
                    neighb.first->distPrev = node->distPrev + neighb.second;
                    neighb.first->prev = node;
                    queue.push(neighb.first);
                    cv.notify_one();
                }
            }
        }
    }
}

如果有帮助,我使用的是VSS 2019社区版本。

您需要同步所有对队列的读取和写入。

像这样的东西(未经测试,原因很明显(:

void doWork(priority_queue<Node*, vector<Node*>, Compare>& queue)
{
    while (true)
    {
        Node* node = nullptr;
        {
            // Wait for an item to appear, or 10 ms to pass.
            unique_lock<mutex> ml(mtx);
            if (queue.empty())
            {
                // If the queue is still empty after 10ms, break out.
                if (!cv.wait_for(ml, 10ms, [&queue]() { return !queue.empty(); }))
                    break;
            }
            // The queue can't be empty here.
            node = queue.top();
            queue.pop();
        }
        // Add neighbours.
        for (auto neighb : node->neighb)
        {
            if (node->distPrev + neighb.second < neighb.first->distPrev)
            {
                neighb.first->distPrev = node->distPrev + neighb.second;
                neighb.first->prev = node;
                // Lock while adding to the queue.
                unique_lock<mutex> ml(mtx);
                queue.push(neighb.first);
                cv.notify_one();
            }
        }
    }
}

请注意,"等待十毫秒后队列为空"并不是确定工作已完成的非常可靠的方法。

或拆分为两个函数:

Node* fetch_next(priority_queue<Node*, vector<Node*>, Compare>& queue)
{
    unique_lock<mutex> ml(mtx);
    if (queue.empty())
    {
        if (!cv.wait_for(ml, 10ms, [&queue]() { return !queue.empty(); }))
            return nullptr;
    }
    Node* node = queue.top();
    queue.pop();
    return node;
}
void doWork(priority_queue<Node*, vector<Node*>, Compare>& queue)
{
    while (Node* node = fetch_next(queue))
    {
        for (auto neighb : node->neighb)
        {
            if (node->distPrev + neighb.second < neighb.first->distPrev)
            {
                neighb.first->distPrev = node->distPrev + neighb.second;
                neighb.first->prev = node;
                unique_lock<mutex> ml(mtx);
                queue.push(neighb.first);
                cv.notify_one();
            }
        }
    }
}

我的猜测是:dowork(( 中的以下代码由多个线程异步执行,因此存在问题:

if (!queue.size()) 

在 else 块中,队列按如下方式访问,同样由多个线程异步访问:

Node* node = queue.top();
queue.pop();

互斥锁下保护上述代码片段可能也是一个更好的主意?

最新更新