我正在创建一种机制,允许用户使用装饰器模式从基本构建块形成任意复杂函数。这在功能方面工作得很好,但我不喜欢它涉及大量虚拟调用的事实,尤其是当嵌套深度变大时。这让我担心,因为复杂函数可能会经常调用(>100.000 次(。
为了避免这个问题,我试图在完成后将装饰器方案变成一个std::function
(cfr。 to_function()
在SSCCE中(。所有内部函数调用都在std::function
构建期间连接。我认为这将比原始装饰器方案更快地进行评估,因为不需要在std::function
版本中执行虚拟查找。
唉,基准测试证明我错了:装饰器方案实际上比我从它构建的std::function
更快。所以现在我想知道为什么。也许我的测试设置有问题,因为我只使用两个微不足道的基本函数,这意味着 vtable 查找可能会被缓存?
我使用的代码包含在下面,不幸的是它很长。
SSCCE
// sscce.cpp
#include <iostream>
#include <vector>
#include <memory>
#include <functional>
#include <random>
/**
* Base class for Pipeline scheme (implemented via decorators)
*/
class Pipeline {
protected:
std::unique_ptr<Pipeline> wrappee;
Pipeline(std::unique_ptr<Pipeline> wrap)
:wrappee(std::move(wrap)){}
Pipeline():wrappee(nullptr){}
public:
typedef std::function<double(double)> FnSig;
double operator()(double input) const{
if(wrappee.get()) input=wrappee->operator()(input);
return process(input);
}
virtual double process(double input) const=0;
virtual ~Pipeline(){}
// Returns a std::function which contains the entire Pipeline stack.
virtual FnSig to_function() const=0;
};
/**
* CRTP for to_function().
*/
template <class Derived>
class Pipeline_CRTP : public Pipeline{
protected:
Pipeline_CRTP(const Pipeline_CRTP<Derived> &o):Pipeline(o){}
Pipeline_CRTP(std::unique_ptr<Pipeline> wrappee)
:Pipeline(std::move(wrappee)){}
Pipeline_CRTP():Pipeline(){};
public:
typedef typename Pipeline::FnSig FnSig;
FnSig to_function() const override{
if(Pipeline::wrappee.get()!=nullptr){
FnSig wrapfun = Pipeline::wrappee->to_function();
FnSig processfun = std::bind(&Derived::process,
static_cast<const Derived*>(this),
std::placeholders::_1);
FnSig fun = [=](double input){
return processfun(wrapfun(input));
};
return std::move(fun);
}else{
FnSig processfun = std::bind(&Derived::process,
static_cast<const Derived*>(this),
std::placeholders::_1);
FnSig fun = [=](double input){
return processfun(input);
};
return std::move(fun);
}
}
virtual ~Pipeline_CRTP(){}
};
/**
* First concrete derived class: simple scaling.
*/
class Scale: public Pipeline_CRTP<Scale>{
private:
double scale_;
public:
Scale(std::unique_ptr<Pipeline> wrap, double scale) // todo move
:Pipeline_CRTP<Scale>(std::move(wrap)),scale_(scale){}
Scale(double scale):Pipeline_CRTP<Scale>(),scale_(scale){}
double process(double input) const override{
return input*scale_;
}
};
/**
* Second concrete derived class: offset.
*/
class Offset: public Pipeline_CRTP<Offset>{
private:
double offset_;
public:
Offset(std::unique_ptr<Pipeline> wrap, double offset) // todo move
:Pipeline_CRTP<Offset>(std::move(wrap)),offset_(offset){}
Offset(double offset):Pipeline_CRTP<Offset>(),offset_(offset){}
double process(double input) const override{
return input+offset_;
}
};
int main(){
// used to make a random function / arguments
// to prevent gcc from being overly clever
std::default_random_engine generator;
auto randint = std::bind(std::uniform_int_distribution<int>(0,1),std::ref(generator));
auto randdouble = std::bind(std::normal_distribution<double>(0.0,1.0),std::ref(generator));
// make a complex Pipeline
std::unique_ptr<Pipeline> pipe(new Scale(randdouble()));
for(unsigned i=0;i<100;++i){
if(randint()) pipe=std::move(std::unique_ptr<Pipeline>(new Scale(std::move(pipe),randdouble())));
else pipe=std::move(std::unique_ptr<Pipeline>(new Offset(std::move(pipe),randdouble())));
}
// make a std::function from pipe
Pipeline::FnSig fun(pipe->to_function());
double bla=0.0;
for(unsigned i=0; i<100000; ++i){
#ifdef USE_FUNCTION
// takes 110 ms on average
bla+=fun(bla);
#else
// takes 60 ms on average
bla+=pipe->operator()(bla);
#endif
}
std::cout << bla << std::endl;
}
基准
使用pipe
:
g++ -std=gnu++11 sscce.cpp -march=native -O3
sudo nice -3 /usr/bin/time ./a.out
-> 60 ms
使用fun
:
g++ -DUSE_FUNCTION -std=gnu++11 sscce.cpp -march=native -O3
sudo nice -3 /usr/bin/time ./a.out
-> 110 ms
你有std::function
绑定 lambda 调用std::function
s 绑定 lamdbas 调用std::function
s ...
看看你的to_function
.它创建一个调用两个std::function
的 lambda,并返回绑定到另一个std::function
的 lambda 。编译器不会静态解析其中任何一个问题。
虚函数解决方案一样多的间接调用结束,也就是说,如果您摆脱绑定processfun
并直接在 lambda 中调用它。否则,您将拥有两倍的数量。
如果要加速,则必须以可以静态解析的方式创建整个管道,这意味着在最终将类型擦除为单个std::function
之前,需要更多模板。
正如 Sebastian Redl 的回答所说,您对虚函数的"替代方案"通过动态绑定函数(虚拟或通过函数指针,取决于std::function
实现(添加了几层间接寻址,然后它仍然调用虚拟Pipeline::process(double)
函数!
此修改通过删除一层std::function
间接寻址并防止对Derived::process
的调用是虚拟的,使其速度显著加快:
FnSig to_function() const override {
FnSig fun;
auto derived_this = static_cast<const Derived*>(this);
if (Pipeline::wrappee) {
FnSig wrapfun = Pipeline::wrappee->to_function();
fun = [=](double input){
return derived_this->Derived::process(wrapfun(input));
};
} else {
fun = [=](double input){
return derived_this->Derived::process(input);
};
}
return fun;
}
不过,与虚拟功能版本相比,这里还有更多的工作要做。
std::function
是出了名的慢;类型擦除和由此产生的分配也在其中起作用,在gcc
中,调用的内联/优化很糟糕。出于这个原因,存在过多的C++"代表",人们试图用他们来解决这个问题。我把一个移植到代码审查:
https://codereview.stackexchange.com/questions/14730/impossibly-fast-delegate-in-c11
但是你可以用谷歌找到很多其他人,或者写你自己的。
编辑:
这些天,在这里寻找一个快速的代表。
libstdc++ 对 std::function 的实现大致如下:
template<typename Signature>
struct Function
{
Ptr functor;
Ptr functor_manager;
template<class Functor>
Function(const Functor& f)
{
functor_manager = &FunctorManager<Functor>::manage;
functor = new Functor(f);
}
Function(const Function& that)
{
functor = functor_manager(CLONE, that->functor);
}
R operator()(args) // Signature
{
return functor_manager(INVOKE, functor, args);
}
~Function()
{
functor_manager(DESTROY, functor);
}
}
template<class Functor>
struct FunctorManager
{
static manage(int operation, Functor& f)
{
switch (operation)
{
case CLONE: call Functor copy constructor;
case INVOKE: call Functor::operator();
case DESTROY: call Functor destructor;
}
}
}
因此,尽管std::function
不知道 Functor 对象的确切类型,但它通过 functor_manager 函数指针调度重要操作,该指针是知道Functor
类型的模板实例的静态函数。
每个std::function
实例都将在堆上分配自己拥有的函子对象副本(除非它不大于指针,例如函数指针,在这种情况下,它只是将指针作为子对象保存(。
重要的一点是,如果底层函子对象具有昂贵的复制构造函数和/或占用大量空间(例如保存绑定参数(,则复制std::function
的成本很高。