根据intel博客实现concurrent_vector



我正在尝试实现一个线程安全的无锁容器,类似于std::vector,根据这个https://software.intel.com/en-us/blogs/2008/07/24/tbbconcurrent_vector-secrets-of-memory-organization

据我所知,为了防止重新分配和使所有线程上的所有迭代器无效,它们添加了新的连续块,而不是单个连续数组
他们添加的每个块的大小都是2的乘方,因此他们可以使用log(index)来找到[index]中的项目应该所在的适当分段。

据我所知,他们有一个指向分段的静态指针数组,所以他们可以快速访问它们,但他们不知道用户想要多少分段,所以他们制作了一个小的初始分段,如果分段数量超过当前计数,他们会分配一个大的分段,然后切换到使用该分段。

问题是,添加一个新的段不能以无锁线程安全的方式完成,或者至少我还没有弄清楚如何完成。我可以原子级地增加当前大小,但仅限于此
而且从小数组切换到大数组的段指针涉及到大量的分配和内存拷贝,所以我不明白他们是怎么做到的。

他们在网上发布了一些代码,但所有重要的函数都没有可用的源代码,它们都在线程构建块DLL中。以下是一些演示问题的代码:

template<typename T>
class concurrent_vector
{
private:
int size = 0;
int lastSegmentIndex = 0;
union
{
T* segmentsSmall[3];
T** segmentsLarge;
};
void switch_to_large()
{
//Bunch of allocations, creates a T* segmentsLarge[32] basically and reassigns all old entries into it
}
public:
concurrent_vector()
{
//The initial array is contiguous just for the sake of cache optimization
T* initialContiguousBlock = new T[2 + 4 + 8]; //2^1 + 2^2 + 2^3
segmentsSmall[0] = initialContiguousBlock;
segmentsSmall[1] = initialContiguousBlock + 2;
segmentsSmall[2] = initialContiguousBlock + 2 + 4;
}
void push_back(T& item)
{
if(size > 2 + 4 + 8)
{
switch_to_large(); //This is the problem part, there is no possible way to make this thread-safe without a mutex lock. I don't understand how Intel does it. It includes a bunch of allocations and memory copies.
}
InterlockedIncrement(&size); //Ok, so size is atomically increased
//afterwards adds the item to the appropriate slot in the appropriate segment
}
};

我不会试图使segmentsLargesegmentsSmall成为并集。是的,这又浪费了一个指针。然后指针,让我们称之为segments,最初可以指向segmentsSmall。

另一方面,其他方法总是可以使用相同的指针,这使它们更简单。

从小到大的切换可以通过一个指针的比较交换来实现。

我不知道有了工会怎么能安全地做到这一点。

这个想法看起来是这样的(请注意,我使用的是C++11,英特尔库早于它,所以他们很可能是用原子内部函数实现的)。这可能遗漏了相当多的细节,我相信英特尔的人已经考虑得更多了,所以你可能必须将其与所有其他方法的实现进行比较。

#include <atomic>
#include <array>
#include <cstddef>
#include <climits>
template<typename T>
class concurrent_vector
{
private:
std::atomic<size_t> size;
std::atomic<T**> segments;
std::array<T*, 3> segmentsSmall;
unsigned lastSegmentIndex = 0;
void switch_to_large()
{
T** segmentsOld = segments;
if( segmentsOld == segmentsSmall.data()) {
// not yet switched
T** segmentsLarge = new T*[sizeof(size_t) * CHAR_BIT];
// note that we leave the original segment allocations alone and just copy the pointers
std::copy(segmentsSmall.begin(), segmentsSmall.end(), segmentsLarge);
for(unsigned i = segmentsSmall.size(); i < numSegments; ++i) {
segmentsLarge[i] = nullptr;
}
// now both the old and the new segments array are valid
if( segments.compare_exchange_strong(segmentsOld, segmentsLarge)) {
// success!
return;
}  else {
// already switched, just clean up
delete[] segmentsLarge;
}
}
}
public:
concurrent_vector()  : size(0), segments(segmentsSmall.data())
{
//The initial array is contiguous just for the sake of cache optimization
T* initialContiguousBlock = new T[2 + 4 + 8]; //2^1 + 2^2 + 2^3
segmentsSmall[0] = initialContiguousBlock;
segmentsSmall[1] = initialContiguousBlock + 2;
segmentsSmall[2] = initialContiguousBlock + 2 + 4;
}
void push_back(T& item)
{
if(size > 2 + 4 + 8) {
switch_to_large();
}
// here we may have to allocate more segments atomically
++size;
//afterwards adds the item to the appropriate slot in the appropriate segment
}
};

相关内容

  • 没有找到相关文章

最新更新