使用CUDA原子操作和网格同步处理共享工作队列



我正在尝试编写一个内核,该内核的线程迭代处理工作队列中的项目。我的理解是,我应该能够通过使用原子操作来操作工作队列(即,从队列中抓取工作项并将新的工作项插入队列(,并通过协作组使用网格同步来确保所有线程都处于同一迭代中(我确保线程块的数量不会超过内核的设备容量(来做到这一点。然而,有时我观察到工作项在迭代过程中被跳过或处理了多次。

下面的代码是一个工作示例来展示这一点。在本例中,创建了一个大小为input_len的数组,其中包含工作项0input_len - 1processWorkItems内核为max_iter迭代处理这些项目。每个工作项都可以将自己及其上一个和下一个工作项放在工作队列中,但marked数组用于确保在迭代过程中,每个工作项最多添加到工作队列一次。最终应该发生的是,histogram中的值之和等于input_len * max_iter,而histogram中的值都不大于1。但我观察到,输出中偶尔会违反这两个标准,这意味着我没有得到原子操作和/或正确的同步。如果有人能指出我的推理和/或实现中的缺陷,我将不胜感激。我的操作系统是Ubuntu 18.04,CUDA版本是10.1,我在P100、V100和RTX 2080 Ti GPU上进行了实验,并观察到了类似的行为。

我用于编译RTX 2080 Ti:的命令

nvcc -O3 -o atomicsync atomicsync.cu --gpu-architecture=compute_75 -rdc=true

RTX 2080 Ti:上运行的一些输入和输出

./atomicsync 50 1000 1000
Skipped 0.01% of items. 5 extra item processing.
./atomicsync 500 1000 1000
Skipped 0.00% of items. 6 extra item processing.
./atomicsync 5000 1000 1000
Skipped 0.00% of items. 14 extra item processing.

原子同步.cu:

#include <stdio.h>
#include <cooperative_groups.h>
#define checkCudaErrors(val) check ( (val), #val, __FILE__, __LINE__ )
template< typename T >
void check(T result, char const *const func, const char *const file, int const line)
{
if (result)
{
fprintf(stderr, "CUDA error at %s:%d code=%d(%s) "%s" n", file, line, static_cast<unsigned int>(result), cudaGetErrorString(result), func);
cudaDeviceReset();
exit(EXIT_FAILURE);
}
}
__device__ inline void addWorkItem(int input_len, int item, int item_adder, int iter, int *queue, int *queue_size, int *marked) {
int already_marked = atomicExch(&marked[item], 1);
if(already_marked == 0) {
int idx = atomicAdd(&queue_size[iter + 1], 1);
queue[(iter + 1) * input_len + idx] = item;
}
}
__global__ void processWorkItems(int input_len, int max_iter, int *histogram, int *queue, int *queue_size, int *marked) {
auto grid = cooperative_groups::this_grid();
const int items_per_block = (input_len + gridDim.x - 1) / gridDim.x;
for(int iter = 0; iter < max_iter; ++iter) {
while(true) {
// Grab work item to process
int idx = atomicSub(&queue_size[iter], 1);
--idx;
if(idx < 0) {
break;
}
int item = queue[iter * input_len + idx];
// Keep track of processed work items
++histogram[iter * input_len + item];
// Add previous, self, and next work items to work queue
if(item > 0) {
addWorkItem(input_len, item - 1, item, iter, queue, queue_size, marked);
}
addWorkItem(input_len, item, item, iter, queue, queue_size, marked);
if(item + 1 < input_len) {
addWorkItem(input_len, item + 1, item, iter, queue, queue_size, marked);
}
}
__threadfence_system();
grid.sync();
// Reset marked array for next iteration
for(int i = 0; i < items_per_block; ++i) {
if(blockIdx.x * items_per_block + i < input_len) {
marked[blockIdx.x * items_per_block + i] = 0;
}
}
__threadfence_system();
grid.sync();
}
}
int main(int argc, char* argv[])
{
int input_len = atoi(argv[1]);
int max_iter = atoi(argv[2]);
int num_blocks = atoi(argv[3]);
// A histogram to keep track of work items that have been processed in each iteration
int histogram_host[input_len * max_iter];
memset(histogram_host, 0, sizeof(int) * input_len * max_iter);
int *histogram_device;
checkCudaErrors(cudaMalloc(&histogram_device, sizeof(int) * input_len * max_iter));
checkCudaErrors(cudaMemcpy(histogram_device, histogram_host, sizeof(int) * input_len * max_iter, cudaMemcpyHostToDevice));
// Size of the work queue for each iteration
int queue_size_host[max_iter + 1];
queue_size_host[0] = input_len;
memset(&queue_size_host[1], 0, sizeof(int) * max_iter);
int *queue_size_device;
checkCudaErrors(cudaMalloc(&queue_size_device, sizeof(int) * (max_iter + 1)));
checkCudaErrors(cudaMemcpy(queue_size_device, queue_size_host, sizeof(int) * (max_iter + 1), cudaMemcpyHostToDevice));
// Work queue
int queue_host[input_len * (max_iter + 1)];
for(int i = 0; i < input_len; ++i) {
queue_host[i] = i;
}
memset(&queue_host[input_len], 0, sizeof(int) * input_len * max_iter);
int *queue_device;
checkCudaErrors(cudaMalloc(&queue_device, sizeof(int) * input_len * (max_iter + 1)));
checkCudaErrors(cudaMemcpy(queue_device, queue_host, sizeof(int) * input_len * (max_iter + 1), cudaMemcpyHostToDevice));
// An array used to keep track of work items already added to the work queue to
// avoid multiple additions of a work item in the same iteration
int marked_host[input_len];
memset(marked_host, 0, sizeof(int) * input_len);
int *marked_device;
checkCudaErrors(cudaMalloc(&marked_device, sizeof(int) * input_len));
checkCudaErrors(cudaMemcpy(marked_device, marked_host, sizeof(int) * input_len, cudaMemcpyHostToDevice));
const dim3 threads(1, 1, 1);
const dim3 blocks(num_blocks, 1, 1);
processWorkItems<<<blocks, threads>>>(input_len, max_iter, histogram_device, queue_device, queue_size_device, marked_device);
checkCudaErrors(cudaDeviceSynchronize());
checkCudaErrors(cudaMemcpy(histogram_host, histogram_device, sizeof(int) * input_len * max_iter, cudaMemcpyDeviceToHost));
int extra = 0;
double deficit = 0;
for(int i = 0; i < input_len; ++i) {
int cnt = 0;
for(int iter = 0; iter < max_iter; ++iter) {
if(histogram_host[iter * input_len + i] > 1) {
++extra;
}
cnt += histogram_host[iter * input_len + i];
}
deficit += max_iter - cnt;
}
printf("Skipped %.2f%% of items. %d extra item processing.n", deficit / (input_len * max_iter) * 100, extra);
checkCudaErrors(cudaFree(histogram_device));
checkCudaErrors(cudaFree(queue_device));
checkCudaErrors(cudaFree(queue_size_device));
checkCudaErrors(cudaFree(marked_device));
return 0;
}

您可能希望阅读如何在编程gude中进行协作网格内核启动,或者研究任何使用网格同步的cuda示例代码(例如reductionMultiBlockCG和其他代码(。

你做得不对。您无法使用普通的<<<...>>>启动语法启动协作网格。因此,没有理由认为内核中的grid.sync()工作正常。

通过在cuda-memcheck下运行网格同步,很容易看出它在代码中不起作用。当你这样做的时候,结果会变得更糟。

当我修改你的代码以进行适当的合作发布时,我对特斯拉V100:没有问题

$ cat t1811.cu
#include <stdio.h>
#include <cooperative_groups.h>
#define checkCudaErrors(val) check ( (val), #val, __FILE__, __LINE__ )
template< typename T >
void check(T result, char const *const func, const char *const file, int const line)
{
if (result)
{
fprintf(stderr, "CUDA error at %s:%d code=%d(%s) "%s" n", file, line, static_cast<unsigned int>(result), cudaGetErrorString(result), func);
cudaDeviceReset();
exit(EXIT_FAILURE);
}
}
__device__ inline void addWorkItem(int input_len, int item, int item_adder, int iter, int *queue, int *queue_size, int *marked) {
int already_marked = atomicExch(&marked[item], 1);
if(already_marked == 0) {
int idx = atomicAdd(&queue_size[iter + 1], 1);
queue[(iter + 1) * input_len + idx] = item;
}
}
__global__ void processWorkItems(int input_len, int max_iter, int *histogram, int *queue, int *queue_size, int *marked) {
auto grid = cooperative_groups::this_grid();
const int items_per_block = (input_len + gridDim.x - 1) / gridDim.x;
for(int iter = 0; iter < max_iter; ++iter) {
while(true) {
// Grab work item to process
int idx = atomicSub(&queue_size[iter], 1);
--idx;
if(idx < 0) {
break;
}
int item = queue[iter * input_len + idx];
// Keep track of processed work items
++histogram[iter * input_len + item];
// Add previous, self, and next work items to work queue
if(item > 0) {
addWorkItem(input_len, item - 1, item, iter, queue, queue_size, marked);
}
addWorkItem(input_len, item, item, iter, queue, queue_size, marked);
if(item + 1 < input_len) {
addWorkItem(input_len, item + 1, item, iter, queue, queue_size, marked);
}
}
__threadfence_system();
grid.sync();
// Reset marked array for next iteration
for(int i = 0; i < items_per_block; ++i) {
if(blockIdx.x * items_per_block + i < input_len) {
marked[blockIdx.x * items_per_block + i] = 0;
}
}
__threadfence_system();
grid.sync();
}
}
int main(int argc, char* argv[])
{
int input_len = atoi(argv[1]);
int max_iter = atoi(argv[2]);
int num_blocks = atoi(argv[3]);
// A histogram to keep track of work items that have been processed in each iteration
int *histogram_host = new int[input_len * max_iter];
memset(histogram_host, 0, sizeof(int) * input_len * max_iter);
int *histogram_device;
checkCudaErrors(cudaMalloc(&histogram_device, sizeof(int) * input_len * max_iter));
checkCudaErrors(cudaMemcpy(histogram_device, histogram_host, sizeof(int) * input_len * max_iter, cudaMemcpyHostToDevice));
// Size of the work queue for each iteration
int queue_size_host[max_iter + 1];
queue_size_host[0] = input_len;
memset(&queue_size_host[1], 0, sizeof(int) * max_iter);
int *queue_size_device;
checkCudaErrors(cudaMalloc(&queue_size_device, sizeof(int) * (max_iter + 1)));
checkCudaErrors(cudaMemcpy(queue_size_device, queue_size_host, sizeof(int) * (max_iter + 1), cudaMemcpyHostToDevice));
// Work queue
int *queue_host = new int[input_len * (max_iter + 1)];
for(int i = 0; i < input_len; ++i) {
queue_host[i] = i;
}
memset(&queue_host[input_len], 0, sizeof(int) * input_len * max_iter);
int *queue_device;
checkCudaErrors(cudaMalloc(&queue_device, sizeof(int) * input_len * (max_iter + 1)));
checkCudaErrors(cudaMemcpy(queue_device, queue_host, sizeof(int) * input_len * (max_iter + 1), cudaMemcpyHostToDevice));
// An array used to keep track of work items already added to the work queue to
// avoid multiple additions of a work item in the same iteration
int marked_host[input_len];
memset(marked_host, 0, sizeof(int) * input_len);
int *marked_device;
checkCudaErrors(cudaMalloc(&marked_device, sizeof(int) * input_len));
checkCudaErrors(cudaMemcpy(marked_device, marked_host, sizeof(int) * input_len, cudaMemcpyHostToDevice));
const dim3 threads(1, 1, 1);
const dim3 blocks(num_blocks, 1, 1);
int dev = 0;
int supportsCoopLaunch = 0;
checkCudaErrors(cudaDeviceGetAttribute(&supportsCoopLaunch, cudaDevAttrCooperativeLaunch, dev));
if (!supportsCoopLaunch) {printf("Cooperative Launch is not supported on this machine configuration.  Exiting."); return 0;}
/// This will launch a grid that can maximally fill the GPU, on the default stream with kernel arguments
int numBlocksPerSm = 0;
// Number of threads my_kernel will be launched with
int numThreads = threads.x;
cudaDeviceProp deviceProp;
checkCudaErrors(cudaGetDeviceProperties(&deviceProp, dev));
checkCudaErrors(cudaOccupancyMaxActiveBlocksPerMultiprocessor(&numBlocksPerSm, processWorkItems, numThreads, 0));
// launch
void *kernelArgs[] = { &input_len, &max_iter, &histogram_device, &queue_device, &queue_size_device, &marked_device};
dim3 dimBlock = dim3(numThreads,1,1);
num_blocks = min(num_blocks, deviceProp.multiProcessorCount*numBlocksPerSm);
dim3 dimGrid(num_blocks, 1, 1);
printf("launching %d blocksn", dimGrid.x);
checkCudaErrors(cudaLaunchCooperativeKernel((void*)processWorkItems, dimGrid, dimBlock, kernelArgs));
// processWorkItems<<<blocks, threads>>>(input_len, max_iter, histogram_device, queue_device, queue_size_device, marked_device);
checkCudaErrors(cudaDeviceSynchronize());
checkCudaErrors(cudaMemcpy(histogram_host, histogram_device, sizeof(int) * input_len * max_iter, cudaMemcpyDeviceToHost));
int extra = 0;
double deficit = 0;
for(int i = 0; i < input_len; ++i) {
int cnt = 0;
for(int iter = 0; iter < max_iter; ++iter) {
if(histogram_host[iter * input_len + i] > 1) {
++extra;
}
cnt += histogram_host[iter * input_len + i];
}
deficit += max_iter - cnt;
}
printf("Skipped %.2f%% of items. %d extra item processing.n", deficit / (input_len * max_iter) * 100, extra);
checkCudaErrors(cudaFree(histogram_device));
checkCudaErrors(cudaFree(queue_device));
checkCudaErrors(cudaFree(queue_size_device));
checkCudaErrors(cudaFree(marked_device));
return 0;
}
$ nvcc -o t1811 t1811.cu -arch=sm_70 -std=c++11 -rdc=true
$ cuda-memcheck ./t1811 50 1000 5000
========= CUDA-MEMCHECK
launching 2560 blocks
Skipped 0.00% of items. 0 extra item processing.
========= ERROR SUMMARY: 0 errors
$ cuda-memcheck ./t1811 50 1000 1000
========= CUDA-MEMCHECK
launching 1000 blocks
Skipped 0.00% of items. 0 extra item processing.
========= ERROR SUMMARY: 0 errors
$ ./t1811 50 1000 5000
launching 2560 blocks
Skipped 0.00% of items. 0 extra item processing.
$ ./t1811 50 1000 1000
launching 1000 blocks
Skipped 0.00% of items. 0 extra item processing.
$ ./t1811 50 1000 1000
launching 1000 blocks
Skipped 0.00% of items. 0 extra item processing.
$

我并不是说上面的代码是无缺陷的或者适合任何特定的用途。这主要是你的代码。我对它进行了修改,只是为了演示上面提到的概念。

顺便说一句,我将您的一些基于堆栈的大型内存分配更改为基于堆的。我不建议尝试创建这样的大型基于堆栈的阵列:

int histogram_host[input_len * max_iter];

在我看来,这样做更好:

int *histogram_host = new int[input_len * max_iter];

随着输入命令行参数变大,这可能会成为一个问题,具体取决于机器的特性。然而,这与CUDA没有太大关系。我没有尝试在您的代码中处理此模式的每个实例。

尽管与这个特定的问题无关,但网格同步对于成功使用也有其他要求。这些内容包含在编程指南中,可能包括但不限于:

  • 平台支持(如操作系统、GPU等(
  • 内核大小要求(启动的线程或线程块的总数(

编程指南包含可用于满足这些要求的方便的锅炉板代码。

最新更新