C++中多线程的安全性与速度



如果我有一个数组,我想由多个线程同时更新,那么最好/最快的方法是什么?例如,假设我有以下代码:

std::vector<float> vec;
vec.push_back(0.f);
for(int i = 0; i < 10000; i++) {
std::thread([&]{ 
// SAFETY CONSTRUCTS GO HERE
vec[0] += 1; // OR MAYBE HERE
// AND HERE? 
});
}
// wait a little while, i.e. I was too lazy to write out joins
std::cout << vec[0];

如果我希望这是安全的,并最终打印出值10000,那么最好/最快的方法是什么?

在您给出的示例中,最好/最安全的方法是不启动线程,只需在循环中更新v[0]。启动和同步线程的开销可能会超过并行执行某些操作所带来的任何好处。

v是一个非原子对象(std::vector<float>),而v[0]实际上是一个函数调用。这样的对象及其非静态成员函数无法保护自己免受多个线程的并发访问。要从多个线程使用它们,必须同步每次直接使用v(和v[0])。

通常,涉及并发执行线程的安全性是通过同步访问由多个线程更新和访问的任何变量(或者更一般地说,内存)来实现的。

如果使用互斥,通常意味着所有访问共享数据的线程必须首先获取互斥,对共享变量执行操作(例如更新v[0]),然后释放互斥。如果线程没有抓取(或者抓取然后释放)互斥锁,那么它所做的所有操作都不能触及共享变量。

若您希望通过线程来获得性能,则需要在每个线程中完成大量的工作,而不需要对共享变量进行任何访问。由于部分工作可以同时执行,因此可以在更短的总运行时间内执行。为了代表性能优势,收益(例如,通过同时执行大量操作)需要超过成本(启动线程、同步访问多个线程访问的任何数据)。

这在与您所展示的代码类似的任何代码中都是极不可能的。

关键是,当线程共享任何数据时,总是要在速度和安全性之间进行权衡。安全要求同步更新共享变量,无一例外。性能增益通常来源于不需要同步的东西(即不访问线程之间共享的变量),并且可以并行执行。

没有一种神奇的技术可以对共享数据进行高性能的并行访问,但您经常会看到一些通用技术。

我将使用并行求和数组的例子来回答,但这些技术通常适用于许多并行算法。

1) 首先避免共享数据

这可能是最安全、最快的方法。与其让工作线程直接更新共享状态,不如让每个工作线程使用自己的本地状态,然后让主线程组合结果。对于数组求和的例子,这可能看起来像这样:

int main() {
std::vector<int> toSum = getSomeVector();
std::vector<int> sums(NUM_THREADS);
std::vector<std::thread> threads;
int chunkSize = std::ceil(toSum.size() / (float)NUM_THREADS);
for (int i = 0; i < NUM_THREADS; ++i) {
auto chunkBegin = toSum.begin() + (i * chunkSize);
auto chunkEnd = chunkBegin + chunkSize;
threads.emplace_back([chunkBegin, chunkEnd](int& result) mutable {
for (; chunkBegin != chunkEnd; ++chunkBegin) {
result += *chunkBegin;
}
}, std::ref(sums[i]));
}
for (std::thread& thd : threads) {
thd.join();
}
int finalSum = 0;
for (int partialSum : sums) {
finalSum += partialSum;
}
std::cout << finalSum << 'n';
}

由于每个线程只对自己的部分和进行操作,因此它们不能相互干扰,也不需要额外的同步。最后必须做一点额外的工作才能将所有的部分和相加,但部分结果的数量很少,因此开销应该非常小。

2) 相互排斥

您可以使用锁定机制来保护共享状态,而不是让每个线程在自己的状态下操作。通常情况下,这是一个互斥锁,但有很多不同的锁定原语具有稍微不同的角色。这里的重点是确保一次只有一个线程使用共享状态。使用此技术时要非常小心,以避免在紧密循环中访问共享状态。由于一次只有一个线程可以持有锁,因此很容易意外地将您喜欢的并行代码转换回单线程代码,使其一次只能工作一个线程。

例如,考虑以下内容:

int main() {
std::vector<int> toSum = getSomeVector();
int sum = 0;
std::vector<std::thread> threads;
int chunkSize = std::ceil(toSum.size() / (float)NUM_THREADS);
std::mutex mtx;
for (int i = 0; i < NUM_THREADS; ++i) {
auto chunkBegin = toSum.begin() + (i * chunkSize);
auto chunkEnd = chunkBegin + chunkSize;
threads.emplace_back([chunkBegin, chunkEnd, &mtx, &sum]() mutable {
for (; chunkBegin != chunkEnd; ++chunkBegin) {
std::lock_guard guard(mtx);
sum += *chunkBegin;
}
});
}
for (std::thread& thd : threads) {
thd.join();
}
std::cout << sum << 'n';
}

由于每个线程都将mtx锁定在其循环中,因此一次只能有一个线程在做任何工作。这里没有并行化,并且由于分配线程以及锁定和解锁互斥锁的额外开销,此代码可能比等效的单线程代码慢。

相反,尝试尽可能多地独立进行,并尽可能少地访问共享状态。对于这个示例,您可以执行与(1)中的示例类似的操作,并在每个线程中建立部分和,最后只将它们添加到共享和中一次:

int main() {
std::vector<int> toSum = getSomeVector();
int sum = 0;
std::vector<std::thread> threads;
int chunkSize = std::ceil(toSum.size() / (float)NUM_THREADS);
std::mutex mtx;
for (int i = 0; i < NUM_THREADS; ++i) {
auto chunkBegin = toSum.begin() + (i * chunkSize);
auto chunkEnd = chunkBegin + chunkSize;
threads.emplace_back([chunkBegin, chunkEnd, &mtx, &sum]() mutable {
int partialSum = 0;
for (; chunkBegin != chunkEnd; ++chunkBegin) {
partialSum += *chunkBegin;
}
{
std::lock_guard guard(mtx);
sum += partialSum;
}
});
}
for (std::thread& thd : threads) {
thd.join();
}
std::cout << sum << 'n';
}

3) 原子变量

原子变量是可以在线程之间"安全"共享的变量。它们非常强大,但也很容易出错。你必须担心内存排序约束之类的问题,当你把它们弄错时,很难调试和找出你做错了什么。

在其核心,原子变量可以实现为一个简单的变量,其操作由互斥或类似的对象保护。神奇之处在于实现,它经常使用特殊的CPU指令来协调对CPU级别变量的访问,以避免锁定和解锁的大量开销。

原子弹并不是一颗神奇的子弹。这仍然涉及到开销,而且你仍然可以通过过于频繁地访问原子来射中自己的脚。您的CPU进行大量缓存,让多个线程写入一个原子变量可能意味着将内容溢出内存,或者至少溢出到更高级别的缓存。再一次,如果你可以避免在线程中的紧密循环中访问你的共享状态,你应该这样做:

int main() {
std::vector<int> toSum = getSomeVector();
std::atomic<int> sum(0);
std::vector<std::thread> threads;
int chunkSize = std::ceil(toSum.size() / (float)NUM_THREADS);
for (int i = 0; i < NUM_THREADS; ++i) {
auto chunkBegin = toSum.begin() + (i * chunkSize);
auto chunkEnd = chunkBegin + chunkSize;

threads.emplace_back([chunkBegin, chunkEnd, &sum]() mutable {
int partialSum = 0;
for (; chunkBegin != chunkEnd; ++chunkBegin) {
partialSum += *chunkBegin;
}
// Since we don't care about the order that the threads update the sum,
// we can use memory_order_relaxed.  This is a rabbit-hole I won't get
// too deep into here though.
sum.fetch_add(partialSum, std::memory_order_relaxed);
});
}
for (std::thread& thd : threads) {
thd.join();
}
std::cout << sum << 'n';
}

最新更新