锁定免费迭代/索引交错数组



对于加速度数据结构的并行工作,我目前使用SpinLock但想设计无锁算法。 数据结构是一个交错数组,其中每个内部数组具有不同的大小。 工作线程应该获取内部数组中的下一个元素,递增索引,如果索引更大,则切换到外部数组中的下一个索引:

for(int i = 0; i < arr.Length; ++i)
{
for(int j = 0; j < arr[i].Length; ++j)
{
DoWork(arr[i][j]);
}
}

我想不出一种方法可以无锁,除了增加共享索引,然后汇总数组的长度:

int sharedIndex = -1;
// -- In the worker thread ---------------------
bool loop = false;

do
{
int index = Interlocked.Increment(ref sharedIndex);
int count = 0;
loop = false;

for(int i = 0; i < arr.Length; ++i)
{
count += arr[i].Length;

if(count > index)
{
var remaining = index - (count - arr[i].Length);
DoWork(arr[i][remaining]);
loop = true;
break;
}
}
} while(loop);

有没有办法不必遍历整个外部数组并且仍然保持无锁状态? 因为我不能同时递增两个索引(对于外部和内部索引)。

您能否通过让每个线程在同步步骤之间执行一到四次外部迭代来划分工作? 如果outer_size / chunk_size / threads至少为 4 个左右(或者可能大于最短和最长内部数组之间的预期比率),则工作调度足够动态,通常应避免让一个线程在很长的数组上长时间运行,而其他线程都已完成。

(如果 1 行(又名内部数组)的块大小足够粗以提高效率,您可以简单地这样做。 你说DoWork太慢了,即使是单个元素的共享计数器也可能不是问题)

如果最后一个内部数组比其他内部数组长,这可能仍然是一个风险。 根据这种情况的常见程度和/或避免最坏情况的重要性,您可以提前查看内部大小并对其进行排序或分区以首先开始处理最长的内部数组,因此最后线程完成之间的差异是较短数组的长度差异。 (例如,实时,与面向吞吐量的用例相比,限制最坏情况比加快平均值更重要。 此外,如果您没有完美地安排这一点,那么其他线程可以使用空闲 CPU 内核执行任何有用的操作。


原子地递增每个内部元素的共享计数器将序列化该元素上的所有线程,因此除非处理每个内部元素非常昂贵,否则它将比没有同步的单线程慢得多

我假设您不需要按顺序开始处理每个元素,因为即使是共享计数器也无法保证这一点(一个线程在递增后可以休眠,另一个线程在之后启动元素)。


如果您要搜索,请从之前的位置开始。

如果您确实想使用单个共享计数器,而不是每次都从外部数组的开头进行线性搜索,而只从之前的位置进行搜索。 共享计数器是单调增加的,因此下一个位置通常会在这一行的后面,或者进入下一行。 这样做应该比每次都从头开始搜索更有效。

例如,保留 3 个变量:prev_indexprev_i, prev_j。 如果j = prev_j + (index - prev_index)仍在当前数组中,则已完成。 这可能是常见情况。 否则,请移动到下一行并通过减去arr[i].Length重新计算,直到您有一个j是该i的边界。

Theodor Zoulias建议预先计算一个数组,其中包含长度的运行总数(也称为前缀和)。 好主意,但是从上一个位置搜索可能没有必要,除非您的行通常很短并且有很多线程。 在这种情况下,每个步骤可能涉及多行,因此您可以更有效地从运行总计数组中的先前位置进行线性搜索。


每行位置计数器:其他线程可以帮助完成一行长行

如果仅按行在线程之间划分工作不够细粒度,您仍然可以主要这样做(争用率较低),但创建一种方法,让线程返回并在没有更多新行时帮助处理未完成的长行。

因此,您按照我的建议开始,每个线程通过共享计数器声明一整行。 当它到达行的末尾时,原子fetch_add获取要处理的新行。一旦新行用完,线程可以返回并查找带有arr[i].work_pos < arr[i].length的数组。

在每一行中,您都有一个包含数组本身(记录长度)的结构,一个原子电流位置计数器,以及另一个原子计数器,用于当前处理此子数组的线程数。

在处理内部数组时,线程以原子方式递增该内部数组的数组内位置计数器,将其用作下一个DoWork的位置。 因此,它仍然是每个DoWork调用之间的完整内存屏障(或一次展开以声明 2 个,然后执行它们),但在总运行时间的大部分时间内,争用大大减少,因为这将是唯一增加该计数器的线程。 (直到后来线程跳入并开始提供帮助)

当我们必须从另一个内核请求它时,缓存行上的原子 RMW 在此内核的 L1d 缓存中保持热度比行上的原子 RMW 便宜得多。 因此,我们希望单独分配每行结构,理想情况下与行数据连续,如在 Cstruct { _Atomic size_t work_pos; size_t len; atomic_int thread_active; struct work arr[]; };与"灵活的数组成员"(因此任意长度的数组与结构的末尾相邻),或者另一个级别的间接寻址只是有一个指向数组的指针/引用。 或者,如果您可以使用整数数组的前 2 个元素进行原子簿记,那也可以。外部数组应该是对这些结构的引用数组,而不是按值,其中多个控制块将共享一个缓存行。 虚假共享与真实共享一样糟糕。 如果DoWork线程对足够慢,以至于通常只有一个内核在飞行中只有一个请求,那么让成对的线程相互争用几乎和所有线程争夺同一个计数器一样糟糕。

然后有趣的部分出现在最后,当行索引上的互锁.增量返回一个超过末尾的索引时。 然后,该线程必须找到一个正在进行的行来提供帮助。 理想情况下,这甚至可以分布在仍在工作的线程上。

也许我们应该有一个数组来记录每个线程正在处理哪一行,每个线程都有一个条目? 因此,寻找帮助位置的线程可以扫描该位置并找到具有最高work_left / threads_working的数组。 (这就是为什么我建议在控制块中使用线程计数成员)。 指向此数组的指针/引用的原子存储中的竞赛与一次读取一个条目的读者不是问题;如果一个数组几乎完成了,我们无论如何都不想选择它,我们会找到一个有用的地方加入。

如果你天真地从外部数组的末尾向后搜索,新的线程将堆积到最后一行不完整的行,即使它几乎完成了,并为其原子计数器产生大量争用。 但是您也不想每次都搜索整个外部数组,如果它可能很大的话。 (如果没有,如果行很长但行不多,那很好。

读取另一个线程正在使用的原子work_pos计数器将干扰该线程,因为它失去了独占所有权,因此其下一个 Interlocked.Increment 将变慢。 因此,我们希望避免线程需要过于频繁地查找新行来跳转。

如果我们有一个很好的启发式方法让他们说一行看起来"足够好"并立即跳入,而不是每次都查看所有活动/不完整的行,这可以减少争用。 但前提是它是一个足够好的启发式方法,可以做出正确的选择。

减少争用的另一种方法是尽量减少线程到达行尾的频率。 选择较大的work_left / threads_working应该可以实现这一点,因为这应该是最后完成哪一行的体面近似值。

同时选择多个线程可能会选择同一行,但我认为我们不能完美(否则会太贵)。 当他们使用 Interlocked.Increment 将自己添加到处理此行的线程数时,我们可以检测到这一点。 回退到第二长的估计时间行可能是合适的,或者检查这是否仍然是具有额外工作人员的估计最慢行。

这不一定是完美的;这一切都只是在事情结束时的清理,在大多数时候以最小的争用运行之后。 只要DoWork相对于线程间延迟不是太便宜,如果我们有时有更多的争用,它就不是灾难。

也许您还希望线程在所有工作完成之前停止自身可能很有用,如果 CPU 内核可以做其他事情。 (或者对于此线程,在工作线程池中执行。

您可以通过对预先计算的数组进行二进制搜索来优化当前算法,该数组包含截至此索引的所有数组的累积长度。例如,如果你有一个由 10 个长度为8, 9, 5, 4, 0, 0, 6, 4, 4, 7的内部数组组成的交错数组,那么预先计算的数组将包含值0, 8, 17, 22, 26, 26, 26, 32, 36, 40。执行二叉搜索将使您直接进入与您正在搜索的index对应的内部数组,仅执行 O(Log n) 比较。

以下是这个想法的实现:

// --- Preparation ------------------------------
int[] indices = new int[arr.Length];
indices[0] = 0;
for (int i = 1; i < arr.Length; i++)
indices[i] = indices[i - 1] + arr[i - 1].Length;
int sumLength = arr.Sum(inner => inner.Length);
int sharedIndex = -1;
// --- In the worker thread ---------------------
while (true)
{
int index = Interlocked.Increment(ref sharedIndex);
if (index >= sumLength) break;
int outerIndex = Array.BinarySearch(indices, index);
if (outerIndex < 0) outerIndex = (~outerIndex) - 1;
while (arr[outerIndex].Length == 0) outerIndex++; // Skip empty arrays
int innerIndex = index - indices[outerIndex];
DoWork(arr[outerIndex][innerIndex]);
}

最新更新