在执行多线程处理时保存数据



有一个下载应用程序,它在多个线程中对下载项执行不同类型的处理。一些线程分析输入的数据,一些线程执行下载、提取、保存状态等。因此,每种类型的线程都对特定的数据成员进行操作,其中一些线程可以同时运行。下载项目可以这样描述:

class File;
class Download
{
public:
    enum State
    {
        Parsing, Downloading, Extracting, Repairing, Finished
    };
    Download(const std::string &filePath): filePath(filePath) { }
    void save()
    {
        // TODO: save data consistently
        StateFile f;    // state file for this download
        // save general download parameters
        f << filePath << state << bytesWritten << totalFiles << processedFiles;
        // Now we are to save the parameters of the files which belong to this download,
        // (!) but assume the downloading thread kicks in, downloads some data and 
        // changes the state of a file. That causes "bytesWritten", "processedFiles" 
        // and "state" to be different from what we have just saved.
        // When we finally save the state of the files their parameters don't match 
        // the parameters of the download (state, bytesWritten, processedFiles).
        for (File *f : files)
        {
            // save the file...
        }
    }
private:
    std::string filePath;
    std::atomic<State> state = Parsing;
    std::atomic<int> bytesWritten = 0;
    int totalFiles = 0;
    std::atomic<int> processedFiles = 0;
    std::mutex fileMutex;
    std::vector<File*> files;
};

我想知道如何一致地保存这些数据。例如,可能已经保存了已处理文件的状态和数量,我们将保存文件列表。同时,其他线程可能会改变文件的状态,从而改变已处理文件的数量或下载的状态,从而使保存的数据不一致。

我想到的第一个想法是为所有数据成员添加一个互斥锁,并在每次任何被访问时锁定它。但是,这可能是低效的,因为大多数时间线程访问不同的数据成员,并且在几分钟内只进行一次保存。

在我看来,这样的任务在多线程编程中是相当常见的,所以我希望有经验的人可以建议一个更好的方法。

我建议你使用生产者消费者模式。

下载器生成一个解析器并通知它消费,解析器生成一个提取器并通知它消费,提取器生成一个修复器。然后,每个任务都有一个队列。可以使用条件变量来优化同步,这样消费者只有在生产出某些东西后才会收到通知。你最终会使用更少的互斥锁和更可读和高效的设计。

这里是一个队列的示例代码,以及如何做,如果你必须同时下载,解析,提取和保存:

#include <thread>
#include <condition_variable>
#include <mutex>
#include <queue>
template<typename T>
class synchronized_queu
{
public:
    T consume_one()
    {
        std::unique_lock<std::mutex> lock(lock_);
        while (queue_.size() == 0)
            condition_.wait(lock); //release and obtain again
        T available_data = queue_.front();
        queue_.pop();
        return available_data;
    }
    void produce_one(const T& data)
    {
        std::unique_lock<std::mutex> lock(lock_);
        queue_.push(data);
        condition_.notify_one();//notify only one or all as per your design...
    }
private:
    std::queue<T> queue_;
    std::mutex lock_;
    std::condition_variable condition_;
};
struct data
{
    //.....
};
void download(synchronized_queu<data>& q)
{
    //...
    data data_downloaded = ; //data downloaded;
    q.produce_one(data_downloaded);
}
void parse(synchronized_queu<data>& q1, synchronized_queu<data>& q2)
{
    //...
    data data_downloaded = q1.consume_one();
    //parse
    data data_parsed = ;//....
    q2.produce_one(data_parsed);
}
void extract(synchronized_queu<data>& q1, synchronized_queu<data>& q2)
{
    //...
    data data_parsed = q1.consume_one();
    //parse
    data data_extracted = ;//....
    q2.produce_one(data_extracted);
}
void save(synchronized_queu<data>& q)
{
    data data_extracted = q.consume_one();
    //save....
}
int main()
{
    synchronized_queu<data> dowlowded_queue;
    synchronized_queu<data> parsed_queue;
    synchronized_queu<data> extracted_queue;
    std::thread downloader(download, dowlowded_queue);
    std::thread parser(parse, dowlowded_queue, parsed_queue);
    std::thread extractor(extract, parsed_queue, extracted_queue);
    std::thread saver(save, extracted_queue);
    while (/*condition to stop program*/)
    {
    }
    downloader.join();
    parser.join();
    extractor.join();
    saver.join();
    return 0;
}

相关内容

  • 没有找到相关文章

最新更新