对于我的班级项目,我们的任务是选择一个算法并使用c++线程实现它。我使用不同的文件来生成随机数,然后使用.sh文件运行快速排序,该文件写入算法的运行时,然后我继续得到无序。当我查看我的。out文件(该文件应该对元素进行排序)时,它对它们进行排序,然后重新开始。例如,它将从101->999开始,然后重新开始101->999,我假设这是因为我合并文件的方式有问题。我一直在看代码,却看不出发生了什么。如果有人能帮我纠正这个问题或指出错误的地方,我将不胜感激。
#include <iostream>
#include <fstream>
#include <thread>
#include <vector>
#include <algorithm>
using namespace std;
const int SIZE = 1000000;
const int THREAD_COUNT = 12;
void QuickSort(int* array, int left, int right) {
int i = left, j = right;
int pivot = array[(left + right) / 2];
while (i <= j) {
while (array[i] < pivot) i++;
while (array[j] > pivot) j--;
if (i <= j) {
swap(array[i], array[j]);
i++;
j--;
}
}
if (left < j) {
QuickSort(array, left, j);
}
if (i < right) {
QuickSort(array, i, right);
}
}
void Merge(int* array, int size, const vector<int>& sizes) {
int* temp = new int[size];
int index = 0;
for (int i = 0; i < sizes.size(); i++) {
copy(array + index, array + index + sizes[i], temp + index);
index += sizes[i];
}
copy(temp, temp + size, array);
delete[] temp;
}
void SortThread(int* array, int left, int right, vector<int>* sizes) {
QuickSort(array, left, right);
sizes->push_back(right - left + 1);
}
int main(int argc, char* argv[]) {
int array[SIZE];
int size = 0;
int num = 0;
ifstream fin(argv[1]);
while (fin >> num && size < SIZE) {
array[size] = num;
size++;
}
fin.close();
vector<thread> threads;
vector<int> sizes;
int chunkSize = size / THREAD_COUNT;
int left = 0;
int right = chunkSize - 1;
for (int i = 0; i < THREAD_COUNT; i++) {
if (i == THREAD_COUNT - 1) {
right = size - 1;
}
threads.emplace_back(SortThread, array, left, right, &sizes);
left = right + 1;
right = left + chunkSize - 1;
}
for (int i = 0; i < THREAD_COUNT; i++) {
threads[i].join();
}
Merge(array, size, sizes);
ofstream fout("mysort.out", ofstream::out);
if (!fout){
cerr << "Error" << endl;
return 1;
}
for (int i = 0; i < size; i++) {
fout << array[i] << "n";
}
fout.close();
return 0;
}
我已经尝试了不同的方法合并线程在一起,并多次重写代码,但我一直得到相同的错误。
一个问题是,当其他线程从同一内存读写时,您的程序写入内存。这种数据竞争使得程序具有未定义的行为——这意味着结果几乎可以是任何东西。
我可以马上发现的是,你在线程中执行sizes->push_back(right - left + 1);
,而不确保每个线程都单独执行。
可以通过使用std::mutex
:
sizes
的互斥访问。#include <mutex>
std::mutex mtx;
void SortThread(int* array, int left, int right, std::vector<int>& sizes) {
QuickSort(array, left, right);
std::lock_guard lock(mtx); // now only one thread at a time can update `sizes`:
sizes.push_back(right - left + 1);
}
请注意,我采用sizes
作为引用,因为nullptr
不应该被允许。这还要求您将其打包在调用站点的std::reference_wrapper
中:
threads.emplace_back(SortThread, array, left, right, std::ref(sizes));
// ^^^^^^^^^^^^^^^
您可能有其他类似的问题-修复是相同的。保护数据,使任何线程都不能在另一个线程更新数据时读/写数据。如果你正在使用clang++
或g++
,你可以用ThreadSanitizer库进行编译,以在运行时获得数据竞争检查。添加编译器选项:
-g -fsanitize=thread
当这已经被修复,那么是的,你的Merge
函数实际上并不合并范围。它只是将每个范围复制到temp
数组中,然后将结果复制回array
。因此,最终结果与输入完全相同。
您需要从具有最小值的范围复制值,并一直复制直到所有范围耗尽。
可以像这样:
void Merge(int* array, std::size_t size, const vector<int>& sizes) {
std::size_t index = 0;
// begin and end iterators for the ranges:
std::vector<std::pair<int*, int*>> its;
its.reserve(size);
for(std::size_t i = 0; i < sizes.size(); i++) {
its.emplace_back(array + index, array + index + sizes[i]);
index += sizes[i];
}
auto temp = std::make_unique_for_overwrite<int[]>(size);
for(int* out = temp.get(); not its.empty(); ++out) {
// find the iterator pointing at the smallest value
auto smallest = std::min_element(its.begin(), its.end(),
[](const std::pair<int*, int*>& lhs,
const std::pair<int*, int*>& rhs) {
return *lhs.first < *rhs.first;
});
*out = *smallest->first; // copy the smallest value to temp
// step the iterator and if it reaches the end, remove it:
if(++smallest->first == smallest->second) its.erase(smallest);
}
std::copy(temp.get(), temp.get() + size, array);
}