如何消除并行std::transform_reduce()的中间容器



我经常要找到Sum( f(i), 1, N )Product( f(i), 1, N ),其中f(i)是计算CPU密集型的,而积分I来自序列范围,但非常大。

使用C++20编译器,我可以编写函数:

uint64_t solution(uint64_t N)
{
std::vector<uint64_t> v(N);
std::iota(v.begin(), v.end(), 1ULL);
return std::transform_reduce(
std::execution::par, 
v.cbegin(), v.cend(), 
0ull, 
std::plus<>(), 
[]f(const uint64_t& i)->uint64_t {
uint64_t result(0);
// expensive computation of result=f(i) goes here
// ...
return result;
});  
}

但这将受到RAM的限制。

我如何在运行时仅使用C++20 STL(即没有特定于供应商的或第三方库(就可以完全消除带有输入向量的中间内存操作,并具有高效的并行执行?

免责声明:我以前没有实现迭代器或C++20 的经验

这似乎适用于我的gcc 10.1和-std=c++2a。我在很短的时间内完成了这项工作,没有花太多心思,所以实现当然可以改进,如果只是通过模板化的话。如果用operator<=>交换旧的双向比较运算符,这也应该用C++17运行,但我还没有测试过。如果你发现任何错误或容易纠正的设计缺陷,欢迎你在下面评论,从而可以改进该答案。

#include <cstddef>
#if __cplusplus > 201703L
#include <compare>
#endif
#include <execution>
#include <iostream>
#include <iterator>
#include <numeric>
class counting_iterator {
public:
typedef std::ptrdiff_t difference_type;
typedef std::ptrdiff_t value_type;
typedef void pointer;
typedef void reference;
typedef std::random_access_iterator_tag iterator_category;
private:
value_type val_{0};
public:
counting_iterator() = default;
explicit counting_iterator(value_type init) noexcept : val_{init} {}
value_type operator*() const noexcept { return val_; }
value_type operator[](difference_type index) const noexcept {
return val_ + index;
}
counting_iterator &operator++() noexcept {
++val_;
return *this;
}
counting_iterator operator++(int) noexcept {
counting_iterator res{*this};
++(*this);
return res;
}
counting_iterator &operator--() noexcept {
--val_;
return *this;
}
counting_iterator operator--(int) noexcept {
counting_iterator res{*this};
--(*this);
return res;
}
friend counting_iterator operator+(counting_iterator const &it,
difference_type const &offset) noexcept;
friend counting_iterator operator+(difference_type const &offset,
counting_iterator const &it) noexcept;
friend counting_iterator operator-(counting_iterator const &it,
difference_type const &offset) noexcept;
friend difference_type operator-(counting_iterator const &a,
counting_iterator const &b) noexcept;
counting_iterator &operator+=(difference_type offset) noexcept {
val_ += offset;
return *this;
}
counting_iterator &operator-=(difference_type offset) noexcept {
val_ -= offset;
return *this;
}
friend bool operator==(counting_iterator const &a,
counting_iterator const &b) noexcept;
#if __cplusplus > 201703L
friend std::strong_ordering operator<=>(counting_iterator const &a,
counting_iterator const &b);
#else
friend bool operator!=(counting_iterator const &a,
counting_iterator const &b) noexcept;
friend bool operator<=(counting_iterator const &a,
counting_iterator const &b) noexcept;
friend bool operator>=(counting_iterator const &a,
counting_iterator const &b) noexcept;
friend bool operator<(counting_iterator const &a,
counting_iterator const &b) noexcept;
friend bool operator>(counting_iterator const &a,
counting_iterator const &b) noexcept;
#endif
};
counting_iterator
operator+(counting_iterator const &it,
counting_iterator::difference_type const &offset) noexcept {
return counting_iterator{it.val_ + offset};
}
counting_iterator operator+(counting_iterator::difference_type const &offset,
counting_iterator const &it) noexcept {
return counting_iterator{it.val_ + offset};
}
counting_iterator
operator-(counting_iterator const &it,
counting_iterator::difference_type const &offset) noexcept {
return counting_iterator{it.val_ - offset};
}
counting_iterator::difference_type
operator-(counting_iterator const &a, counting_iterator const &b) noexcept {
return a.val_ - b.val_;
}
bool operator==(counting_iterator const &a,
counting_iterator const &b) noexcept {
return a.val_ == b.val_;
}
#if __cplusplus > 201703L
std::strong_ordering operator<=>(counting_iterator const &a,
counting_iterator const &b) {
return a.val_ <=> b.val_;
}
#else
bool operator!=(counting_iterator const &a,
counting_iterator const &b) noexcept {
return a.val_ != b.val_;
}
bool operator<=(counting_iterator const &a,
counting_iterator const &b) noexcept {
return a.val_ <= b.val_;
}
bool operator>=(counting_iterator const &a,
counting_iterator const &b) noexcept {
return a.val_ >= b.val_;
}
bool operator<(counting_iterator const &a,
counting_iterator const &b) noexcept {
return a.val_ < b.val_;
}
bool operator>(counting_iterator const &a,
counting_iterator const &b) noexcept {
return a.val_ > b.val_;
}
#endif
int main() {
auto res = std::transform_reduce(
std::execution::par, 
counting_iterator(0), counting_iterator(10), 
0L, 
std::plus<>(), 
[](const std::ptrdiff_t& i) { return i * i; });
std::cout << res << std::endl;
}

编辑:我对这个课程进行了修改,使它也能与C++17一起使用。现在,它还显式地对std::random_access_iterator_tag进行了类型定义。我仍然无法使用该执行策略进行任何并行计算,无论是迭代器还是向量,所以我不知道类本身是否有任何东西禁止并行执行。

经过一些按摩和实验,我确认基于Paul上面的示例的双向迭代器已经工作:

class counting_iterator {
public:
using iterator_category = std::bidirectional_iterator_tag;
using difference_type = std::ptrdiff_t;
using value_type = std::ptrdiff_t;
private:
value_type val_;
public:
counting_iterator() : val_(0) {}
explicit counting_iterator(value_type init) : val_(init) {}
value_type operator*() noexcept { return val_; }
const value_type& operator*() const noexcept { return val_; }
counting_iterator& operator++() noexcept { ++val_; return *this; }
counting_iterator operator++(int) noexcept { counting_iterator res{ *this }; ++(*this); return res; }
counting_iterator& operator--() noexcept { --val_; return *this; }
counting_iterator operator--(int) noexcept { counting_iterator res{ *this }; --(*this); return res; }
value_type operator[](difference_type index) noexcept { return val_ + index; }
counting_iterator& operator+=(difference_type offset) noexcept { val_ += offset; return *this; }
counting_iterator& operator-=(difference_type offset) noexcept { val_ -= offset; return *this; }
counting_iterator operator+(difference_type offset) const noexcept { return counting_iterator{ *this } += offset; };
/*counting_iterator& operator+(difference_type offset) noexcept { return operator+=(offset); }*/
counting_iterator operator-(difference_type offset) const noexcept { return counting_iterator{ *this } -= offset; };
/*counting_iterator& operator-(difference_type offset) noexcept { return operator-=(offset); }*/
difference_type operator-(counting_iterator const& other) noexcept { return val_ - other.val_; }
bool operator<(counting_iterator const& b) const noexcept { return val_ < b.val_; }
bool operator==(counting_iterator const& b) const noexcept { return val_ == b.val_; }
bool operator!=(counting_iterator const& b) const noexcept { return !operator==(b); }
std::strong_ordering operator<=>(counting_iterator const& b) const noexcept { return val_ <=> b.val_; }
};

虽然std::transform_reduceiterator_category = std::random_access_iterator_tag并行,但我无法使其工作,我认为这是性能下降的原因。

UPD:在上面的代码中,注释行使MS编译器选择了它们而不是复制版本选项,如果迭代器被标记为random_access_category_tag,这在并行执行过程中会造成严重破坏。

最新更新