在线程函数中处理数据向量时进行线程竞速



作为一个早期的 c++/线程编码器,我在其中一个测试函数中遇到了一些线程赛车的困难,非常感谢一些反馈。

我的 parent(( 函数接收一个相当大的图像向量(来自 openCV 的 cv::Mat(作为输入,任务是分别计算每个图像的运算符(例如膨胀(。我编写了一个循环,它使用 worker(( 函数创建线程,并在每个线程上传递我的输入向量的子集。

每个线程的结果将存储在该输入子集向量上。我的问题是我无法从 parent(( 中检索它。

作为替代方案,我将整个向量传递给 worker((,每个线程都有开始和结束索引,但随后我遇到了一些严重的线程竞速问题,比串行方法消耗更多的时间。

请参阅下面的代码。

std::vector<cv::Mat> worker(std::vector<cv::Mat>& ctn);
std::vector<cv::Mat> worker(std::vector<cv::Mat>& ctn) {
int erosion_type = cv::MORPH_RECT;
int erosion_size = 5;
cv::Mat element = cv::getStructuringElement( erosion_type,
cv::Size( 2*erosion_size + 1, 2*erosion_size+1 ),
cv::Point( erosion_size, erosion_size ) );
this_mutex.lock();
for(uint it=0; it<ctn.size(); ++it) {
cv::erode(ctn[it], ctn[it], element);
}
this_mutex.unlock();
return ctn;
}

void parent(std::vector<cv::Mat>& imageSet) {
auto start = std::chrono::steady_clock::now();
const auto processor_count = std::thread::hardware_concurrency();
std::vector<std::thread> threads;
const int grainsize = imageSet.size() / processor_count;
uint work_iter = 0;
std::vector<cv::Mat> target; // holds the output vector
// create the threads
for(uint it=0; it<processor_count-1; ++it) {
std::vector<cv::Mat> subvec(imageSet.begin() + work_iter, imageSet.begin() + work_iter + grainsize);
threads.emplace_back([&,it]() {
std::vector<cv::Mat> tmp = worker(subvec);
target.insert(target.end(), tmp.begin(), tmp.end());
});
work_iter += grainsize;
}
// create the last thread for the remainder of the vector elements
std::vector<cv::Mat> subvec(imageSet.begin() + work_iter, imageSet.end());
int it = processor_count-1;
threads.emplace_back([&,it]() {
std::vector<cv::Mat> tmp = worker(subvec);
target.insert(target.end(), tmp.begin(), tmp.end());
});
// join the threads
for(int i=0; i<threads.size(); ++i) {
threads[i].join();
}
auto end = std::chrono::steady_clock::now();
std::chrono::duration<double> elapsed_seconds = end-start;
std::cout << "elapsed time: " << elapsed_seconds.count() << "sn";
// try to reconstruct the output  
imageSet.clear();
for(int i=0; i<target.size(); ++i) {
imageSet.push_back(target[i]);
}
}

在此代码中,语句target.insert(target.end(), tmp.begin(), tmp.end())旨在将 target[ ] 向量与每个线程的结果连接起来,但它没有及时执行,因此我在最后得到一个空的目标 []。

任何想法如何让目标[]收集所有tmp[]?

你在哪里想这样的事情? 这会单独处理它们,但您可以根据需要将其切块,并根据需要从 lamda 返回一个向量。
注意:这是在 C++11 中,因为这是您标记的内容。如果您可以访问 17,这将变得简单得多。

#include <vector>
#include <algorithm>
#include <numeric>
#include <future>
#include <iostream>
int main()
{
std::vector<int> input{0,1,2,3,4,5,6,7,8,9,10};
for(const auto& item : input)
{
std::cout << item << " ";
}
std::cout << std::endl;
std::vector<std::future<int>> threads{};
for(const auto& item : input)
{
threads.push_back(std::async(std::launch::async, [&item]{
return item * 100;
}));
}
std::vector<int> output{};
for(auto& thread : threads)
{
output.push_back(thread.get());
}
for(const auto& item : output)
{
std::cout << item << " ";
}
return 0;
}

每个线程一个结果 (res(。

#include <iostream>
#include <thread>
#include <vector>
#include <algorithm>
#include <cassert>
void threadFunction (std::vector<int> &speeds, int start, int end, std::vector<int>& res);
int main()
{
std::vector<int> images (100000);
auto processor_count = std::thread::hardware_concurrency();
auto step = images.size() / processor_count;
auto startFrom = 0;
// one result vector (res) for each thread (t).
std::vector<std::thread>t;
std::vector<std::vector<int>>res (processor_count);
// Start the threads
for (auto i = 0; i < processor_count; ++i) 
{
auto th = std::thread(threadFunction, std::ref(images), startFrom, startFrom+step, std::ref(res[i]));
t.push_back(std::move(th));
startFrom += step;
}
// Join
std::for_each(begin(t), end(t), [](std::thread &t)
{
assert(t.joinable());
t.join();
});
// Results here. Each thread puts the results in res[i];
return 0;
}
void threadFunction (std::vector<int> &images, int start, int end, std::vector<int>& res)
{
for (int i = start; i <= end; ++i)
res.push_back(images[i]);
}

最新更新