并行归并排序中的性能问题



我尝试使用线程和模板编写合并排序的并行实现。相关代码如下:

我比较了性能与sort从c++ STL。当没有线程生成时,我的代码比std::sort慢6倍。使用变量maxthreads(和/或FACTOR),我只能将性能提高一倍,因此在最好的情况下,我的速度比std::sort慢3倍。我已经在16核多处理器机器上尝试了代码。

htop显示内核按预期使用,但是为什么在性能上缺乏并且我在整个运行时中没有感觉到并行性?

出错了吗?

谢谢你的回复。

#define FACTOR 1
static unsigned int maxthreads = FACTOR * std::thread::hardware_concurrency();
unsigned int workers=0;
std::mutex g_mutex;
template <typename T>
std::vector<T>* mergesort_inplace_multithreading(
    typename std::vector<T>::iterator* listbegin,
    typename std::vector<T>::iterator *listend,
    std::vector<T>* listarg)
{
    if (*listbegin == *listend)
    {
        return listarg;
    }
    else if (*listend == *listbegin + 1)
    {
        return listarg;
    }
    else
    {
        size_t offset = std::distance(*listbegin, *listend)/2;
        typename std::vector<T>::iterator listhalf = *listbegin + offset;
        g_mutex.lock();
        if (::workers <= maxthreads-2 and maxthreads >=2)
        {
            workers += 2;
            g_mutex.unlock();
            std::thread first_thread(mergesort_inplace_multithreading<T>, listbegin, &listhalf, listarg);
            std::thread second_thread(mergesort_inplace_multithreading<T>, &listhalf, listend, listarg);
            first_thread.join();
            second_thread.join();
            g_mutex.lock();
            workers -= 2;
            g_mutex.unlock();
        }
        else
        {
            g_mutex.unlock();
            mergesort_inplace_multithreading<T>(listbegin, &listhalf, listarg);
            mergesort_inplace_multithreading<T>(&listhalf, listend, listarg);
        }
        typename std::vector<T> result;
        typename std::vector<T>::iterator lo_sorted_it = *listbegin;
        typename std::vector<T>::iterator hi_sorted_it = listhalf;
        typename std::vector<T>::iterator lo_sortedend = listhalf;
        typename std::vector<T>::iterator hi_sortedend = *listend;
        while (lo_sorted_it != lo_sortedend and hi_sorted_it != hi_sortedend)
        {
            if (*lo_sorted_it <= *hi_sorted_it)
            {
                result.push_back(*lo_sorted_it);
                ++lo_sorted_it;
            }
            else
            {
                result.push_back(*hi_sorted_it);
                ++hi_sorted_it;
            }
        }//end while
        if (lo_sorted_it != lo_sortedend)
        {
            //assert(hi_sorted_it == hi_sortedend);
            result.insert(result.end(), lo_sorted_it, lo_sortedend);
        }
        else
        {
            //assert(lo_sorted_it == lo_sortedend);
            result.insert(result.end(), hi_sorted_it, hi_sortedend);
        }
        std::copy(result.begin(), result.end(), *listbegin);
        return listarg;
    }
}
int main()
{
    //some tests
}

并行归并排序不需要互斥锁。当然,您不需要为每次分区分割启动两个线程。启动一个线程;第二个分区在当前线程上处理;比起一个线程什么都不做,只是等待另外两个线程完成,这样可以更好地利用线程资源。

首先,简单的测试程序,排序2000万个无符号整数。注意:所有程序编译的苹果LLVM版本5.1 (clang-503.0.40)(基于LLVM 3.4svn), 64位,posix线程,优化设置在O2

测试程序

int main()
{
    using namespace std::chrono;
    
    std::random_device rd;
    std::mt19937 rng(rd());
    std::uniform_int_distribution<unsigned int> dist(0, std::numeric_limits<unsigned int>::max());
    
    std::vector<unsigned int> v, back(20*1000000);
    
    for (int i=0; i<5; ++i)
    {
        std::cout << "Generating...n";
        std::generate_n(back.begin(), back.size(), [&](){return dist(rng);});
        
        time_point<system_clock> t0, t1;
        
        v = back;
        std::cout << "std::sort: ";
        t0 = system_clock::now();
        std::sort(v.begin(), v.end());
        t1 = system_clock::now();
        std::cout << duration_cast<milliseconds>(t1-t0).count() << "msn";
        
        v = back;
        std::cout << "mergesort_mt1: ";
        t0 = system_clock::now();
        mergesort_mt1(v.begin(), v.end());
        t1 = system_clock::now();
        std::cout << duration_cast<milliseconds>(t1-t0).count() << "msn";
    }
    
    return 0;
}

并行归并排序

我们从一些超基本的东西开始。我们将并发线程的数量限制为标准库中报告的硬件并发数。一旦达到这个限制,我们就停止发出新的线程,而只是对现有的线程进行递归。一旦分布在硬件支持的线程上,这个平凡的算法会有令人惊讶的良好行为。

template<typename Iter>
void mergesort_mt1(Iter begin, Iter end,
                  unsigned int N = std::thread::hardware_concurrency()/2)
{
    auto len = std::distance(begin, end);
    if (len < 2)
        return;
    
    Iter mid = std::next(begin, len/2);
    if (N > 1)
    {
        auto fn = std::async(mergesort_mt1<Iter>, begin, mid, N-2);
        mergesort_mt1(mid, end, N-2);
        fn.wait();
    }
    else
    {
        mergesort_mt1(begin, mid, 0);
        mergesort_mt1(mid, end, 0);
    }
    
    std::inplace_merge(begin, mid, end);
}

Generating...
std::sort: 1902ms
mergesort_mt1: 1609ms
Generating...
std::sort: 1894ms
mergesort_mt1: 1584ms
Generating...
std::sort: 1881ms
mergesort_mt1: 1589ms
Generating...
std::sort: 1840ms
mergesort_mt1: 1580ms
Generating...
std::sort: 1841ms
mergesort_mt1: 1631ms

这看起来很有希望,但肯定可以改进。


并行合并+标准库排序

不同厂商的std::sort算法在实现上差异很大。标准的主要限制是它必须具有平均复杂度O(NlogN)。为了在性能方面实现这一点,许多std::sort算法是您可以在标准库中找到的最复杂、最疯狂的优化代码。我仔细阅读了一些具有几个内部排序特征的实现。我曾见过一个这样的实现,对较大的分区使用introsort(在递归深度受限之前使用快速排序,然后是堆排序),一旦到达较小的分区,就会屈服于庞大的手动展开的16个槽插入排序。

关键是,标准库的作者明白,一个通用的排序算法并不适合所有的算法。几个人经常被用来完成这项任务,经常和谐地一起工作。不要天真地认为你能打败他们;相反,通过利用他们的辛勤工作加入他们

修改代码很简单。我们对所有小于1025的分区使用std::sort。其余部分相同:

template<typename Iter>
void mergesort_mt2(Iter begin, Iter end,
                   unsigned int N = std::thread::hardware_concurrency())
{
    auto len = std::distance(begin, end);
    if (len <= 1024)
    {
        std::sort(begin,end);
        return;
    }
    
    Iter mid = std::next(begin, len/2);
    if (N > 1)
    {
        auto fn = std::async(mergesort_mt2<Iter>, begin, mid, N-2);
        mergesort_mt2(mid, end, N-2);
        fn.wait();
    }
    else
    {
        mergesort_mt2(begin, mid, 0);
        mergesort_mt2(mid, end, 0);
    }
    
    std::inplace_merge(begin, mid, end);
}

将新的测试用例添加到测试程序后,我们得到:

Generating...
std::sort: 1930ms
mergesort_mt1: 1695ms
mergesort_mt2: 998ms
Generating...
std::sort: 1854ms
mergesort_mt1: 1573ms
mergesort_mt2: 1030ms
Generating...
std::sort: 1867ms
mergesort_mt1: 1584ms
mergesort_mt2: 1005ms
Generating...
std::sort: 1862ms
mergesort_mt1: 1589ms
mergesort_mt2: 1001ms
Generating...
std::sort: 1847ms
mergesort_mt1: 1578ms
mergesort_mt2: 1009ms

OK。现在我们看到了一些令人印象深刻的东西。但我们能挤出更多吗?


并行归并+标准排序w/有限递归

如果您考虑一下,为了充分利用std::sort所提供的所有艰苦工作,我们可以在达到全部线程填充时简单地停止递归。如果发生这种情况,只需排序无论我们有std::sort和合并的东西在一起完成。虽然很难相信,但这实际上会降低代码的复杂性。我们的算法变成了一个简单的跨核分布分区,每个分区在时间到来时由std::sort处理:

template<typename Iter>
void mergesort_mt3(Iter begin, Iter end,
                   unsigned int N = std::thread::hardware_concurrency()/2)
{
    auto len = std::distance(begin, end);
    if (len <= 1024 || N < 2)
    {
        std::sort(begin,end);
        return;
    }
    
    Iter mid = std::next(begin, len/2);
    auto fn = std::async(mergesort_mt3<Iter>, begin, mid, N-2);
    mergesort_mt3(mid, end, N-2);
    fn.wait();
    std::inplace_merge(begin, mid, end);
}

再一次,将这个添加到我们的测试循环后…

Generating...
std::sort: 1911ms
mergesort_mt1: 1656ms
mergesort_mt2: 1006ms
mergesort_mt3: 802ms
Generating...
std::sort: 1854ms
mergesort_mt1: 1588ms
mergesort_mt2: 1008ms
mergesort_mt3: 806ms
Generating...
std::sort: 1836ms
mergesort_mt1: 1580ms
mergesort_mt2: 1017ms
mergesort_mt3: 806ms
Generating...
std::sort: 1843ms
mergesort_mt1: 1583ms
mergesort_mt2: 1006ms
mergesort_mt3: 853ms
Generating...
std::sort: 1855ms
mergesort_mt1: 1589ms
mergesort_mt2: 1012ms
mergesort_mt3: 798ms

如前所述,对于1024项或更小的分区,我们只委托给std::sort。如果分区更大,我们引入一个新线程来处理分割分区的一侧,使用当前线程处理另一侧。一旦我们达到线程限制N,我们就停止分裂,并简单地将所有事情委托给std::sort。简而言之,我们是std::sort的多线程分发前端。


总结

我们还有更多的子弹可以发射(使用一些元编程并假设一个固定的并发池数),但我把它留给你。

您可以显著地提高您的排序性能,如果您只是专注于分区,分配到线程直到被选中,利用高度优化的底层分区排序算法,然后将东西拼接在一起完成工作。还有改进的余地吗?当然可以。但是在上面给出的最简单的形式中,没有锁,没有互斥等等。在2011年中期一台配备4gB内存和双核i7处理器的MacBook Air上,最终样本和裸机std::sort的数据集相比,提高了58%。这是令人印象深刻的,考虑到它只需要很少的代码就可以完成,简单的简直太棒了

感谢您的回复。

互斥锁只保护unsigned int worker(一个全局变量),它跟踪产生了多少个线程。如果达到最大值(由maxthreads给出),则不再生成线程。您可以使用mergesort_mt2中的参数N来实现这一点。

你的机器有多少个内核?

性能似乎翻倍…

最新更新