以原子方式取消排队的最简单方法



我有一组数据必须使用多线程同时处理,数据的数量应该大于线程的数量。我决定将数据放入某种队列中,以便每个空闲线程都可以弹出其部分并对其进行处理,直到队列为空。当我想将元素从中取消排队时,我可以使用简单的 STL 队列并通过互斥锁锁定它,但我想尝试一种无锁方法。同时我的项目太小,无法依赖一些提供无锁结构的第三方库,实际上我只需要原子出列。所以我决定基于一个带有指向"head"的指针的向量实现我自己的队列,并以原子方式递增这个指针:

template <typename T>
class AtomicDequeueable
{
public:
    // Assumption: data vector never changes
    AtomicDequeueable(const std::vector<T>& data) :
        m_data(data),
        m_pointer(ATOMIC_VAR_INIT(0))
    {}
    const T * const atomicDequeue()
    {
        if (std::atomic_load(&m_pointer) < m_data.size())
        {
            return &m_data
            [
                std::atomic_fetch_add(&m_pointer, std::size_t(1))
            ];
        }
        return nullptr;
    }
private:
    AtomicDequeueable(const AtomicDequeueable<T>&) {}
    std::atomic_size_t m_pointer;
    const std::vector<T>& m_data;
};

线程的函数如下所示:

void f(AtomicDequeueable<Data>& queue)
{
    while (auto dataPtr = queue.atomicDequeue())
    {
        const Data& data = *dataPtr;
        // processing data...
        std::this_thread::sleep_for(std::chrono::milliseconds(1));
    }
}

使用无锁结构和基元的经验真的很差,所以我想知道:我的方法会正常工作吗?当然,我已经在 Ideone 上对其进行了测试,但我不知道它如何处理真实数据。

目前,您的atomicDequeue函数存在数据竞赛:在执行第二个指令之前,2 个线程可以同时执行第一条atomic指令。但是,这可以修复,因为您实际上只需要 1 个原子操作,如以下更改所示:

const T * const atomicDequeue()
{
    auto myIndex = std::atomic_fetch_add(&m_pointer, std::size_t(1));
    if(myIndex >= m_data.size())
        return nullptr;
    return &m_data[myIndex];
}

只要在线程操作期间不修改输入向量,这就可以工作。

你的代码非常有问题。让我在这里非常坦率地提出建议:

  • "阿加斯: 不要做已经做过的事情。"   您有大量预先存在的C++类可供您使用,这些类可以可靠地执行队列和进程间通信 (IPC)。 使用其中之一。

  • 不要担心"无锁"。这就是锁的用途,它们可靠、快速且便宜。

你"使用队列"的概念是合理的,但你要做不必要的工作......并创建错误...当你可以简单地"从货架上拿东西"时。您知道标准部件将正常工作,因为其他人已经对其进行了死亡测试。

相关内容

  • 没有找到相关文章

最新更新