我有很多线程在多个数据项上进行工作。线程必须按照相同的顺序将结果删除,我将数据交给线程。那就是:
Thread #1: give data - start processing
Thread #2: give data - start processing
Thread #3: give data - start processing
...
Thread #n: give data - start processing
不管先完成哪个线程处理,都应以相同的顺序以相同的顺序将结果传递给线程。即:
Thread #1: put data
Thread #2: put data
...
要区分线程并管理它们,我给了每个线路一个ID (0,1,2,...,n)
。我正在使用ID将数据分配给每个线程,以便可以对其进行处理。
for(int i=0; i<thread_count; i++)
give_data(i); // i is id and the function knows where to get data from
我希望线程共享一个令牌,该令牌确定预期哪个线程会产生结果。所有螺纹身体都是相同的,身体看起来像:
while(true){
auto data = get_data();
result = process_data(data);
while(token != this_id) spin;
put_data(result); // this is a synchronized call
update_token(token);
}
我的问题附带token
。我首先尝试了一个正常参考(int & token
(,显然无法正常工作(我没想到它(。无论如何,我使用了一个静态变量,并且线程并不总是得到最新的变量。我惊讶地看到一个线程主导着一切。每当线程更新令牌时,它都会丢失转弯,允许另一个线程放置其结果,依此类推。但是,我有一个线程主导,好像令牌总是设置为自己的ID,而不是更新。
如果我不得不猜测我会说这是一个缓存问题。但是,我不确定。
无论如何,我正在考虑将std::atomic<int>
用作令牌。它可以工作吗?如果没有,我还应该考虑做什么?同步这些线程的更好方法是什么?
Extra:这感觉像是一个不好的设计,我不确定如何做得更好。任何建议都将不胜感激。
无论如何,我使用了一个静态变量,并且线程并不总是得到最新的变量。我很惊讶地看到一个线程占主导地位
是的,多个线程访问相同的不同步值,其中至少一个写入其中的是A Data Race ,根据C 标准,这是不确定的行为。一切都可能发生。
我正在考虑使用std ::原子作为我的令牌。它可以工作吗?
是。这将防止令牌上的任何数据竞赛。我在您的伪代码中看不到其他任何直接问题,因此从这个角度看,它看起来不错。
这感觉像是一个不好的设计,我不确定如何做得更好。任何建议都将不胜感激。
整个设计看起来确实有些奇怪,但是如果有一种更简单的表达方法,则取决于您的线程库。例如,使用OpenMP,您可以对此进行一次(give_data
和get_data
背后的逻辑都不清楚,以至于无法使其完整(:
#pragma omp parallel
{
int threadCount = omp_get_num_threads();
#pragma omp single
for (int i = 0; i < threadCount; ++i)
give_data(i);
#pragma omp ordered for ordered schedule(static)
for (int i = 0; i < threadCount; ++i)
{
auto data = get_data();
result = process_data(data);
#pragma omp ordered
put_data(result); // this is a synchronized call
}
}
ordered
指令强迫put_data
呼叫以相同的顺序执行(一对一(,就好像循环是串行的,而线程仍然可以并行进行先前的数据处理。
,如果您只想做的所有想要做的就是使数据处理与有序写入的一个很大的数据处理,那么
实际上可能会更容易使用:
#pragma omp parallel for ordered schedule(static)
for (int i = 0; i < dataItemCount; ++i)
{
auto data = get_data(i); // whatever this would entail
auto result = process_data(data);
#pragma omp ordered
put_data(result); // this is a synchronized call
}
看起来您不需要数据项分配,但是如果您实际上这样做,则此方法将无法轻易起作用,因为您只能在每个订购循环中只有一个订购的部分。
<</p>max的答案很棒。如果我有时间使用OpenMP,那么我会这样做。但是,我不是这就是为什么我要将这个答案发布到我的问题上的原因。
在我以前的设计中,它取决于彼此同步的线程,这似乎不是最好的想法,因为这么多会出错。相反,我决定让经理同步他们的结果(我从Max的最后一个代码片段中得出了想法(。
void give_threads_data(){
vector<pair<data, promise<result>*> promises(threads.size());
vector<future<result>> futures(threads.size());
for(int i=0; i<threads.size(); i++){
data d = get_data();
threads[i].put_data(d, promises[i]);
futures[i] = promises[i].get_future();
}
for(int i=0; i<futures.size(); i++){
result = futures[i].get();
// handle result
}
}
这样,我能够像将结果发送到线程一样获得结果。线体变得更加干净:
void thread_body(){
while(true){
pair<data, promise<result>*> item = queue.get(); // blocking call
data d = item.first;
promise<result>* promise = item.second;
result r = process_data(d);
promise->set_value(r);
}
}
没有游戏,结果是完美的。下次我进行线程时,我将考虑OpenMP。