我正在处理一个C++项目,该项目需要在线程池中运行许多作业。这些作业很容易失败,这意味着我需要知道每个作业在完成后是如何终止的。在大多数情况下,作为一名Java程序员,我喜欢使用"futures"或类似范式的想法,类似于Java的util.concurrent包中的各种类。
我有两个问题:第一,C++是否已经存在这样的东西(我在Boost中没有找到任何东西,但可能我没有足够认真地寻找);其次,这对C++来说是一个合理的想法吗?
我在这里找到了一个我试图实现的简单例子:
http://www.boostcookbook.com/Recipe:/1234841
这种方法有意义吗?
期货既存在于即将推出的标准(C++0x)中,也存在于内部boost中。请注意,虽然主名称future
是相同的,但您需要阅读文档来定位其他类型并理解其语义。我不知道Java的未来,所以我不能告诉你它们在哪里不同,如果它们不同的话
boost中的库是由Anthony Williams编写的,我相信他也参与了该部分标准的定义。他还编写了C++并发操作,其中包括对未来、任务、承诺和相关对象的良好描述。如果您感兴趣的话,他的公司还销售一个完整的、最新实现的C++0x线程库。
请注意,当您在boost::unique_future
上调用get()
方法时,它将重新抛出异步执行期间可能存储在其中的任何异常。
我建议你做一些类似的事情:
#pragma once
#include <tbb/concurrent_queue.h>
#include <boost/thread.hpp>
#include <boost/noncopyable.hpp>
#include <functional>
namespace internal
{
template<typename T>
struct move_on_copy
{
move_on_copy(const move_on_copy<T>& other) : value(std::move(other.value)){}
move_on_copy(T&& value) : value(std::move(value)){}
mutable T value;
};
template<typename T>
move_on_copy<T> make_move_on_copy(T&& value)
{
return move_on_copy<T>(std::move(value));
}
}
class executor : boost::noncopyable
{
boost::thread thread_;
tbb::concurrent_bounded_queue<std::function<void()>> execution_queue_;
template<typename Func>
auto create_task(Func&& func) -> boost::packaged_task<decltype(func())> // noexcept
{
typedef boost::packaged_task<decltype(func())> task_type;
auto task = task_type(std::forward<Func>(func));
task.set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.
{
try
{
if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock.
my_task();
}
catch(boost::task_already_started&){}
}));
return std::move(task);
}
public:
explicit executor() // noexcept
{
thread_ = boost::thread([this]{run();});
}
~executor() // noexcept
{
execution_queue_.push(nullptr); // Wake the execution thread.
thread_.join();
}
template<typename Func>
auto begin_invoke(Func&& func) -> boost::unique_future<decltype(func())> // noexcept
{
// Create a move on copy adaptor to avoid copying the functor into the queue, tbb::concurrent_queue does not support move semantics.
auto task_adaptor = internal::make_move_on_copy(create_task(func));
auto future = task_adaptor.value.get_future();
execution_queue_.push([=]
{
try{task_adaptor.value();}
catch(boost::task_already_started&){}
});
return std::move(future);
}
template<typename Func>
auto invoke(Func&& func) -> decltype(func()) // noexcept
{
if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock.
return func();
return begin_invoke(std::forward<Func>(func), prioriy).get();
}
private:
void run() // noexcept
{
while(true)
{
std::function<void()> func;
execution_queue_.pop(func);
if(!func)
break;
func();
}
}
};
C++模板的限制性比Java Generics小,因此‘Future’可以很容易地与它们和线程同步原语一起移植。至于支持这种机制的现有库,希望其他人知道。