使用shared_ptr时出现故障



我正在尝试使用shared_ptr在 C++ 中实现基于懒惰并发列表的集。我的理由是,unreachable nodes将在最后一shared_ptr自动释放。根据我的理解,shared_ptr's reference count上的递增和递减操作是原子的。这意味着只有引用节点的最后一shared_ptr才应该为该节点调用delete/free。我为多个线程运行了该程序,但是我的程序崩溃并出现错误double free called或只是分段错误(SIGSEGV)。我不明白这怎么可能。下面给出的是我的实现代码,方法名称表示其预期操作。

#include<thread>
#include<iostream>
#include<mutex>
#include<climits>
using namespace std;
class Thread
{
public:
std::thread t;  
};
int n=50,ki=100,kd=100,kc=100;`/*no of threads, no of inserts,deletes & searches*/`

class Node
{
public:
int key;
shared_ptr<Node> next;
bool marked;
std::mutex nodeLock;
Node() {
key=0;
next = nullptr;
marked = false;
}
Node(int k) {
key = k;
next = nullptr;
marked = false;
}
void lock() {
nodeLock.lock();
}
void unlock() {
nodeLock.unlock();
}
~Node()
{
}
};
class List {
shared_ptr<Node> head;
shared_ptr<Node> tail;
public:
bool validate(shared_ptr<Node> pred, shared_ptr<Node> curr) {
return !(pred->marked) && !(curr->marked) && ((pred->next) == curr);
}
List() {
head=make_shared<Node>(INT_MIN);
tail=make_shared<Node>(INT_MAX);
head->next=tail;
}
bool add(int key)
{
while(true)
{
/*shared_ptr<Node> pred = head;
shared_ptr<Node> curr = pred->next;*/
auto pred = head;
auto curr = pred->next;
while (key>(curr->key))
{
pred = curr;
curr = curr->next;
}
pred->lock();
curr->lock();
if (validate(pred,curr))
{
if (curr->key == key)
{
curr->unlock();
pred->unlock();
return false;
}
else
{
shared_ptr<Node> newNode(new Node(key));
//auto newNode = make_shared<Node>(key);
//shared_ptr<Node> newNode = make_shared<Node>(key);
newNode->next = curr;
pred->next = newNode;
curr->unlock();
pred->unlock();
return true;
}
}
curr->unlock();
pred->unlock();
}
}
bool remove(int key)
{
while(true)
{
/*shared_ptr<Node> pred = head;
shared_ptr<Node> curr = pred->next;*/
auto pred = head;
auto curr = pred->next;
while (key>(curr->key))
{
pred = curr;
curr = curr->next;
}
pred->lock();
curr->lock();
if (validate(pred,curr))
{
if (curr->key != key)
{
curr->unlock();
pred->unlock();
return false;
}
else
{
curr->marked = true;
pred->next = curr->next;
curr->unlock();
pred->unlock();
return true;
}
}
curr->unlock();
pred->unlock();
}
}
bool contains(int key) {
//shared_ptr<Node> curr = head->next;
auto curr = head->next;
while (key>(curr->key)) {
curr = curr->next;
}
return curr->key == key && !curr->marked;
}
}list;
void test(int curr)
{
bool test;
int time;
int val, choice;
int total,k=0;
total=ki+kd+kc;
int i=0,d=0,c=0;
while(k<total)
{
choice = (rand()%3)+1;
if(choice==1)
{
if(i<ki)
{
val = (rand()%99)+1;
test = list.add(val);
i++;
k++;
}
}
else if(choice==2)
{
if(d<kd)
{
val = (rand()%99)+1;
test = list.remove(val);
d++;
k++;
}
}
else if(choice==3)
{
if(c<kc)
{
val = (rand()%99)+1;
test = list.contains(val);
c++;
k++;
}
}
}
}
int main()
{
int i;
vector<Thread>thr(n);
for(i=0;i<n;i++)
{
thr[i].t = thread(test,i+1);
}
for(i=0;i<n;i++)
{
thr[i].t.join();
}
return 0;
}

我无法弄清楚上面的代码有什么问题。每次错误都不同,其中一些只是SEGFAULTS

pure virtual method called
terminate called without an active exception
Aborted (core dumped)

你能指出我在上面的代码中做错了什么吗?以及如何解决该错误?
编辑:增加了一个非常粗糙的test function,随机调用三个list methods。此外,线程数和每个操作的数量是全局声明的。粗糙的编程,但它重新创建了SEGFAULT。

问题是您没有对shared_ptr使用原子存储和加载操作。

确实,shared_ptr的控制块(参与特定共享对象所有权的每个shared_ptr都有一个指针)中的引用计数是原子的,但是,shared_ptr本身的数据成员不是。

因此,让多个线程每个线程都有自己的共享对象shared_ptr是安全的,但是一旦至少有一个线程使用非 const 成员函数,就让多个线程访问同一shared_ptr是无法保存的,这是您在重新分配next指针时所做的。

说明问题

让我们看一下libstdc++的shared_ptr实现的(简化和美化)复制构造函数:

shared_ptr(const shared_ptr& rhs)
: m_ptr(rhs.m_ptr),
m_refcount(rhs.m_refcount) 
{ }

在这里,m_ptr只是一个指向共享对象的原始指针,而m_refcount是一个执行引用计数并处理m_ptr指向对象的最终删除的类。

只是可能出错的一个例子:假设当前线程正在尝试确定列表中是否包含特定键。它从List::contains中的复制初始化auto curr = head->next开始。在它设法初始化curr.m_ptr操作系统调度程序决定此线程必须暂停并在另一个线程中调度后。

另一个线程正在删除head的后继者。由于head->next的 ref-count 仍然是 1(毕竟,线程 1 尚未修改head->next的 ref-count),当第二个线程完成删除节点时,它被删除。

然后一段时间后,第一个线程继续。它完成了curr的初始化,但由于m_ptr在线程 2 开始删除之前已经初始化,它仍然指向现在删除的节点。尝试比较时key > curr->key线程 1 将访问无效内存。

使用 std::

atomic_load 和 std::atomic_store 来防止此问题

std::atomic_loadstd::atomic_store通过在调用指针传入的shared_ptr的复制构造函数/复制赋值运算符之前锁定互斥锁来防止问题发生。如果从多个线程共享的所有读取和写入shared_ptr都是通过std::atomic_load/std::atomic_store或 resp. 永远不会发生一个线程只修改了m_ptr而不是另一个线程开始读取或修改同一shared_ptr时的引用计数。

在必要的原子载荷和存储下,List成员函数应如下所示:

bool validate(Node const& pred, Node const& curr) {
return !(pred.marked) && !(curr.marked) && 
(std::atomic_load(&pred.next).get() == &curr);
}
bool add(int key) {
while (true) {
auto pred = std::atomic_load(&head);
auto curr = std::atomic_load(&pred->next);
while (key > (curr->key)) {
pred = std::move(curr);
curr = std::atomic_load(&pred->next);
}
std::scoped_lock lock{pred->nodeLock, curr->nodeLock};
if (validate(*pred, *curr)) {
if (curr->key == key) {
return false;
} else {
auto new_node = std::make_shared<Node>(key);
new_node->next = std::move(curr);
std::atomic_store(&pred->next, std::move(new_node));
return true;
}
}
}
}
bool remove(int key) {
while (true) {
auto pred = std::atomic_load(&head);
auto curr = std::atomic_load(&pred->next);
while (key > (curr->key)) {
pred = std::move(curr);
curr = std::atomic_load(&pred->next);
}
std::scoped_lock lock{pred->nodeLock, curr->nodeLock};
if (validate(*pred, *curr)) {
if (curr->key != key) {
return false;
} else {
curr->marked = true;
std::atomic_store(&pred->next, std::atomic_load(&curr->next));
return true;
}
}
}
}
bool contains(int key) {
auto curr = std::atomic_load(&head->next);
while (key > (curr->key)) {
curr = std::atomic_load(&curr->next);
}
return curr->key == key && !curr->marked;
}

此外,您还应该Node::markedstd::atomic_bool.

最新更新