我正试图用线程池创建一个方法,但当我开始使用这个函数时,结果是不正确的。我认为这是数据竞赛错误,因为每个线程都试图同时写入frequency_dictionary,但我不知道如何纠正(
void InvertedIndex::UpdateDocumentBase(std::vector<std::string> input_docs) {
boost::asio::thread_pool pool(4);
for (int i = 0; i < input_docs.size(); ++i) {
boost::asio::post(pool, boost::bind<void>([&,i] {
std::stringstream ss(input_docs[i]);
std::string word;
while (ss >> word) {
std::transform(word.begin(), word.end(), word.begin(), tolower);
if (freq_dictionary.find(word) == freq_dictionary.end()) {
std::pair<std::string, std::vector<Entry>> pair;
pair.first = word;
Entry curEntry{static_cast<size_t>(i), 1};
std::vector<Entry> entryVec;
entryVec.push_back(curEntry);
pair.second = entryVec;
freq_dictionary.insert(pair);
} else {
bool added = false;
for (auto &entry: freq_dictionary[word]) {
if (entry.doc_id == static_cast<size_t>(i)) {
entry.count++;
added = true;
break;
}
}
if (!added) {
Entry newEntry{static_cast<size_t>(i), 1};
freq_dictionary[word].push_back(newEntry);
}
}
}
}));
}
pool.join();
}
我收到这个";表达式:crtisvalidehamenter块";在糟糕的情况下,或者只是错误的答案。如果我只使用1个线程,结果将是正确的,但我想改进线程池。
由于锁定,所需的并行化不会特别快(er(。事实上,它可能会更慢,这取决于数据的性质。
如果添加新词的情况非常罕见,即使在文档中也是如此,那么您可能会对锁定计数器并使其成为原子计数器感到满意。至少还要将输入文档移动到已发布的任务中。我在这里将freq_dictionary
重命名为histo
:
在Coliru上直播
#include <atomic>
#include <boost/algorithm/string/case_conv.hpp>
#include <boost/asio.hpp>
#include <filesystem>
#include <fstream>
#include <iostream>
#include <map>
#include <mutex>
#include <numeric>
#include <ranges>
#include <sstream>
struct InvertedIndex {
void UpdateDocumentBase(std::vector<std::string> input_docs);
struct Entry {
size_t doc_id;
std::atomic_size_t count;
Entry(size_t doc_id, size_t count) : doc_id(doc_id), count(count) {}
Entry(Entry const& rhs) : doc_id(rhs.doc_id), count(rhs.count.load()) {}
};
using Entries = std::vector<Entry>;
std::map<std::string, Entries> histo;
std::mutex _mx;
};
void InvertedIndex::UpdateDocumentBase(std::vector<std::string> input_docs) {
constexpr int N = 4;
boost::asio::thread_pool pool(N);
for (size_t i = 0; i < input_docs.size(); ++i) {
post(pool, [doc = std::move(input_docs[i]), this, i] {
std::stringstream ss(std::move(doc));
for (std::string word; ss >> word;) {
boost::algorithm::to_lower(word);
if (auto hit = histo.find(word); hit != histo.end()) {
Entries& ee = hit->second;
auto eit = find_if(begin(ee), end(ee), [i](Entry const& e) {
return e.doc_id == i;
});
if (eit != end(ee)) {
eit->count++;
} else {
std::lock_guard lk(_mx);
ee.push_back(Entry{i, 1});
}
} else {
std::lock_guard lk(_mx);
histo.emplace(word, Entries{{i, 1}});
}
}
});
}
pool.join();
}
int main(int argc, char** argv) {
std::vector<std::string> input_docs;
{
std::vector const fnames(argv + 1, argv + argc);
size_t total_size = 0;
for (std::filesystem::path fname : fnames) {
std::ifstream ifs(fname);
input_docs.push_back({std::istreambuf_iterator<char>(ifs), {}});
total_size += input_docs.back().size();
}
std::cout << "Read " << fnames.size() << " documents ("
<< (total_size >> 10) << "KiB)n";
}
InvertedIndex ii;
ii.UpdateDocumentBase(std::move(input_docs));
struct Occurrence {
std::string_view w;
size_t n;
};
std::vector<Occurrence> occ;
namespace v = std::views;
for (auto& [w, entries] : ii.histo) {
auto counts = entries | v::transform(&InvertedIndex::Entry::count);
auto total = std::accumulate(begin(counts), end(counts), 0);
occ.emplace_back(w, total);
}
auto nth = occ.begin()+10;
std::ranges::partial_sort(occ, nth, std::greater<>{}, &Occurrence::n);
for (auto& [w, n] : occ | v::take(10))
std::cout << "Occurs " << n << " times: " << std::quoted(w) << "n";
}
打印(用于55个随机Coliru样本程序(:
Read 55 documents (49KiB)
Occurs 303 times: "{"
Occurs 255 times: "<<"
Occurs 213 times: "}"
Occurs 190 times: "="
Occurs 128 times: "#include"
Occurs 122 times: "int"
Occurs 90 times: "return"
Occurs 84 times: "//"
Occurs 83 times: "};"
Occurs 71 times: "const"
real 0m0.030s
user 0m0.008s
sys 0m0.052s
变得更聪明
我建议减少循环的粒度,并可能在完成后合并结果。OpenMP甚至std::执行并行算法使这比原始线程更容易。