如何将并发解决方案应用于类似生产者-消费者的情况



我有一个包含节点序列的XML文件。每个节点代表一个元素,我需要解析并将其添加到排序列表中(顺序必须与文件中找到的节点相同)。

目前我正在使用顺序解决方案:

struct Graphic
{
    bool parse()
    {
        // parsing...
        return parse_outcome;
    }
};
vector<unique_ptr<Graphic>> graphics;
void producer()
{
    for (size_t i = 0; i < N_GRAPHICS; i++)
    {
        auto g = new Graphic();
        if (g->parse())
            graphics.emplace_back(g);
        else
            delete g;
    }
}

所以,只有当图形(实际上是从Graphic, Line, Rectangle等派生的类的实例,这就是为什么new)可以正确解析时,它将被添加到我的数据结构中。

因为我只关心这些图形添加到列表中的顺序,所以我想异步调用parse方法,这样生产者的任务是从文件中读取每个节点并将该图形添加到数据结构中,而消费者的任务是在准备解析新图形时解析每个图形。

现在我有几个消费者线程(在main中创建),我的代码如下所示:

queue<pair<Graphic*, size_t>> q;
mutex m;
atomic<size_t> n_elements;
void producer()
{
    for (size_t i = 0; i < N_GRAPHICS; i++)
    {
        auto g = new Graphic();
        graphics.emplace_back(g);
        q.emplace(make_pair(g, i));
    }
    n_elements = graphics.size();
}
void consumer()
{
    pair<Graphic*, size_t> item;
    while (true)
    {
        {
            std::unique_lock<std::mutex> lk(m);
            if (n_elements == 0)
                return;
            n_elements--;
            item = q.front();
            q.pop();
        }
        if (!item.first->parse())
        {
            // here I should remove the item from the vector
            assert(graphics[item.second].get() == item.first);
            delete item.first;
            graphics[item.second] = nullptr;
        }
    }
}

我首先在main中运行producer,这样当第一个consumer启动时,队列已经完全满了。

int main()
{
    producer();
    vector<thread> threads;
    for (auto i = 0; i < N_THREADS; i++)
        threads.emplace_back(consumer);
    for (auto& t : threads)
        t.join();
    return 0;
}

并发版本似乎至少比原始版本快一倍。完整的代码已上传到这里。

现在我想知道:

  • 是否有任何(同步)错误在我的代码?
  • 是否有一种方法可以更快(或更好)地达到相同的结果?

另外,我注意到在我的计算机上,如果我将线程数设置为8,我将获得最佳结果(就运行时间而言)。更多(或更少)线程会给我带来最差的结果。为什么?

引用没有同步错误,但我认为内存管理可以更好,因为你的代码泄露,如果parse()抛出一个异常。

没有同步错误,但我认为您的内存管理可以更好,因为如果parse()抛出异常,您将有泄漏。

引用是否有一种方法可以更快(或更好)达到相同的结果?

。您可以使用线程池和lambda的简单实现来为您执行parse()。

下面的代码说明了这种方法。我使用线程池实现

#include <iostream>
#include <stdexcept>
#include <vector>
#include <memory>
#include <chrono>
#include <utility>
#include <cassert>
#include <ThreadPool.h>
using namespace std;
using namespace std::chrono;

#define N_GRAPHICS        (1000*1000*1)
#define N_THREADS       8

struct Graphic;
using GPtr = std::unique_ptr<Graphic>;
static vector<GPtr> graphics;
struct Graphic
{
    Graphic()
        : status(false)
    {
    }

    bool parse()
    {
        // waste time
        try
        {
            throw runtime_error("");
        } 
        catch (runtime_error)
        {
        }
        status = true;
        //return false;
        return true;
    }

    bool status;
};

int main()
{
    auto start = system_clock::now();
    auto producer_unit = []()-> GPtr {
        std::unique_ptr<Graphic> g(new Graphic);
        if(!g->parse()){
            g.reset(); // if g don't parse, return nullptr
        }
        return g;        
    };
    using ResultPool = std::vector<std::future<GPtr>>;
    ResultPool results;
    // ThreadPool pool(thread::hardware_concurrency());
    ThreadPool pool(N_THREADS);
    for(int i = 0; i <N_GRAPHICS; ++i){
     // Running async task
     results.emplace_back(pool.enqueue(producer_unit));
    }
   for(auto &t : results){
        auto value = t.get();
        if(value){
          graphics.emplace_back(std::move(value));
        }
    }
    auto duration = duration_cast<milliseconds>(system_clock::now() - start);
    cout << "Elapsed: " << duration.count() << endl;
    for (size_t i = 0; i < graphics.size(); i++)
    {
        if (!graphics[i]->status)
        {
            cerr << "Assertion failed! (" << i << ")" << endl;
            break;
        }
    }
    cin.get();
    return 0;
}

它在我的机器上更快一点(15秒),更可读,并且消除了共享数据的必要性(同步是邪恶的,避免它或以可靠和有效的方式隐藏它)。

最新更新