void AFCQueue::ExtractValuesSecComplex(int startIndex, int endIndex,int helperIndex)
{
int size = 0,i,index;
TimeType min_timestamp;
bool is_singleQueue = false;
TimeType* local_queue_time = helper_queue_time[helperIndex];
int* local_queue_value = helper_queue_value[helperIndex];
volatile int& in_local_helper = in_sec[helperIndex];
volatile int& out_local_helper = out_sec[helperIndex];
NodeArrayBlock * heads_local[_MAX_THREADS];
NodeArrayBlock *tails_local[_MAX_THREADS];
int outs_local[_MAX_THREADS];
int ins_local[_MAX_THREADS];
TimeType local_timearray[_MAX_THREADS];
int min_index = 0;
min_timestamp = timestamps_arr[startIndex];
for (i=startIndex,index=startIndex ; i < endIndex; i++){
heads_local[i] = (NodeArrayBlock *)heads[i];
outs_local[i] = outs[i];
tails_local[i] = (NodeArrayBlock *)tails[i];
ins_local[i] = ins[i];
local_timearray[i] = timestamps_arr[i];
if (local_timearray[i] < min_timestamp){
min_timestamp = local_timearray[i];
index = i;
}
}
do{
//if central queue is full
while((out_local_helper-1)==in_local_helper ||
(out_local_helper==0 && in_local_helper == HELPERS_QUEUE_SIZE_1) || _gIsStopThreads){
if (_gIsStopThreads)
return;
}
local_queue_time[in_local_helper] = heads_local[index]->_timestamp_arr[outs_local[index]];
local_queue_value[in_local_helper] = heads_local[index]->_values_arr[outs_local[index]++];
if (in_local_helper < HELPERS_QUEUE_SIZE_1)
in_local_helper++;
else
in_local_helper = 0;
if (outs_local[index] == _INIT_SIZE){
heads_local[index]->_free = true;
heads_local[index] = heads_local[index]->_next;
if (heads_local[index]==null)
{
tails_local[index] = null;
ins_local[index]=0;
}
outs_local[index] = 0;
}
if (ins_local[index] == outs_local[index] &&
heads_local[index]==tails_local[index])
{
//if it was not the last local queue in the array of snapshots
if (--endIndex != index){
heads_local[index] = heads_local[endIndex];
tails_local[index] = tails_local[endIndex];
outs_local[index] = outs_local[endIndex];
ins_local[index] = ins_local[endIndex];
local_timearray[index] = local_timearray[endIndex];
}
if ((endIndex-startIndex)==1)
is_singleQueue = true;
heads_local[endIndex] = null;
}else{
local_timearray[index] = heads_local[index]->_timestamp_arr[outs_local[index]];
}
//If a single Queue left, no need to check timestamps
if (is_singleQueue){
int out = outs_local[startIndex];
int in = ins_local[startIndex];
NodeArrayBlock* he = heads_local[startIndex];
NodeArrayBlock* ta = tails_local[startIndex];
int* value_arr = he->_values_arr;
TimeType* time_arr = he->_timestamp_arr;
while (true){
if ((in == out && he==ta))
{
//heads[startIndex] = null;
return;
}
if (out == _INIT_SIZE){
he->_free = true;
he = he->_next;
if (he==null)
{
//heads[startIndex]=null;
return;
}
value_arr = he->_values_arr;
time_arr = he->_timestamp_arr;
out = 0;
}
while((out_local_helper-1)==in_local_helper ||
(out_local_helper==0 && in_local_helper == HELPERS_QUEUE_SIZE_1) ||
_gIsStopThreads){
if (_gIsStopThreads)
return;
}
if (he==ta){
if (out_local_helper <= in_local_helper){
min_index = Math::Min(HELPERS_QUEUE_SIZE-in_local_helper,in-out);
}else{
min_index = Math::Min(out_local_helper-1-in_local_helper,in-out);
}
}else{
if (out_local_helper <= in_local_helper){
min_index = Math::Min(HELPERS_QUEUE_SIZE-in_local_helper,_INIT_SIZE-out);
}else{
min_index = Math::Min(out_local_helper-1-in_local_helper,_INIT_SIZE-out);
}
}
memcpy(&local_queue_time[in_local_helper],&time_arr[out],min_index * sizeof(*time_arr));
memcpy(&local_queue_value[in_local_helper],&value_arr[out],min_index * sizeof(*value_arr));
in_local_helper+=min_index;
out+=min_index;
if (in_local_helper == HELPERS_QUEUE_SIZE)
in_local_helper = 0;
}
}
if (endIndex==startIndex)
break;
min_timestamp = local_timearray[startIndex];
for(i = startIndex+1,index=startIndex; i < endIndex ;i++){
if (local_timearray[i] < min_timestamp){
min_timestamp = local_timearray[i];
index = i;
}
}
}while(true);
}
这是我算法中的一个片段,此函数专用于单个线程,该线程迭代队列数量(分别有带有时间戳的队列和带有值的队列)
执行此方法的每个线程都会遍历 X 个队列,并将它们合并到时间戳和值的单个循环队列中。
此函数遭受大量缓存未命中,
如何改进它以减少缓存未命中(多个线程同时使用不同的 id 执行此方法 - helperIndex)
这不是真正的代码问题,因此发布代码无效。
要限制缓存未命中,请更改 DATA,以便每个线程一次只需在自己的 [L1 缓存大小] 块上工作。 使用合并排序,这相当容易。
典型的有效合并排序将使用线程池和合并任务,这些线程和合并任务拆分其输入分区并生成子合并,直到任务获得小于 [L1 缓存大小] 的分区,然后使用就地排序(如快速排序)来完成最后一位。
拆分可以使用一个额外的 [数据大小] 缓冲区来完成,任务在完成快速排序后执行插入排序时在缓冲区之间移动数据。 应该不需要任何记忆复制。
仅仅采用单线程内联代码并使其在多个线程上工作,而不考虑数据,这是无效的。
您有很多缓存未命中。您必须记住的是,CPU与L3缓存和RAM之间只有一条总线。因此,如果线程 1 忙于读取内存,然后挂起以允许另一个线程执行相同的操作但使用不同的内存,则需要重新加载缓存。当进程挂起时也会发生这种情况 - 当进程恢复执行时,缓存需要重新加载。
若要限制缓存未命中数,请将线程数限制为物理内核数(忽略超线程内核)。如果线程数多于内核数,则每次进行线程切换时都需要更新缓存。如果您有相同数量的线程和内核,则可以减少将缓存丢失到另一个线程/进程的机会。您想尝试 num_cores - 1 个线程,看看是否有帮助。
此外,您的代码非常大且未注释。很难看出你到底在做什么。