我目前正在尝试多线程创建一个树,其中保存树的类在特定大小(概念上大小是任意的(的块中预先分配Node
的std::vector
。只有在必要的时候才会创建额外的Node
块,这是因为树很快就会变得很大,我想避免为了时间效率而不断使用new
运算符。
这些Node
s的矢量定义为:std::vector< std::vector< Node > > nodes
CCD_ 7保持跟踪内部矢量内的位置并且CCD_ 8保持跟踪当前正在使用哪个外部矢量。
向量在构造函数中的大小调整为:
nodes.resize( 1 );
nodes[chunkCount].resize( CHUNK_SIZE );
Node
的简化版本是:
typedef struct Node {
int val;
Node* subnodes[5];
} Node;
创建新的Node
如下:
void TreeClass::createNode( Node* node, short index, int val )
{
omp_set_lock( &treeLock ); // treeLock belongs to TreeClass
head++;
if( head == CHUNK_SIZE ) {
std::vector< Node > tempNodeVec( CHUNK_SIZE );
nodes.push_back( tempNodeVec );
chunkCount++;
head = 0;
}
node->subnodes[index] = &( nodes[chunkCount][head] );
omp_unset_lock( &treeLock );
node->subnodes[index]->val = val;
}
这非常好用。然而,我担心的是,在创建节点时,除了一个线程外,其他所有线程都会被阻塞,这种情况经常发生,因此需要花费大量时间来阻塞或锁定/解锁treeLock
,因此我希望使该函数无锁,但到目前为止我的尝试都失败了。
在没有使用#pragma omp atomic
(或通过使用std::atomic< int >
s(的锁定的情况下,更改head
和chunkCount
是很容易的,但这是确保if( ... )
语句只执行一次以及在任何线程继续分配子级地址之前执行的逻辑,即确保它们使用正确/更新的chunkCount
和head
。
阅读关于无锁定算法的一个想法是在Node
中使用std::atomic< Node* > subnodes[5]
,并执行CAS操作,等待正确更新的head
和chunkCnt
,但不知道什么是"正确的",我怎么知道我在等待什么?
另一个(天真的(想法是:
int myHead;
if( ++head == CHUNK_SIZE ) {
std::vector< Node > tempNodeVec( CHUNK_SIZE );
nodes.push_back( tempNodeVec );
chunkCount++;
myhead = head = 0;
} else {
myhead = head;
while( head > CHUNK_SIZE )
myHead = ++head;
}
node->subnodes[index] = &( nodes[chunkCount][myHead] );
这个想法是,只有一个线程进入if( ... )
,直到它将head
设置为0,其余线程将被困在else { ... }
中,但我已经看到了这种方法的许多问题。
如有任何帮助,我们将不胜感激。
我建议您使用线程专用内存池。为此,您可以使用以下注释:
#pragma omp threadprivate(nodes)
这不仅比试图保护对共享内存池的访问要简单得多,而且由于数据的局部性,它的性能可能会更好。
注意:在解决方案中基于原子论实现无锁定是不可能的,因为每次分配都需要nodes[chunkCount]
,必须始终保护其不受nodes.push_back
的影响。
全功能内存池更为复杂,但作为一个小步骤,您可以尝试使用std::deque
。它在不干扰两个向量的情况下提供了所需的内容——在恒定时间内插入元素,同时不使现有元素的指针无效。你的控制力较弱,但这是一个良好的开端。