是否可以对boost::asio中的条件变量执行异步等待(读取:非阻塞)?如果不直接支持,请提供实现它的提示。
我可以实现一个计时器,甚至每隔几毫秒触发一次唤醒,但这种方法非常低劣,我发现很难相信没有实现/记录条件变量同步。
如果我理解的意图正确,你想启动一个事件处理程序,当一些条件变量是信号,在一个线程池的上下文中?我认为等待处理程序开始时的条件变量,以及io_service::post()本身最终回到池中就足够了,类似于:
#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
boost::asio::io_service io;
boost::mutex mx;
boost::condition_variable cv;
void handler()
{
boost::unique_lock<boost::mutex> lk(mx);
cv.wait(lk);
std::cout << "handler awakenedn";
io.post(handler);
}
void buzzer()
{
for(;;)
{
boost::this_thread::sleep(boost::posix_time::seconds(1));
boost::lock_guard<boost::mutex> lk(mx);
cv.notify_all();
}
}
int main()
{
io.post(handler);
boost::thread bt(buzzer);
io.run();
}
我可以建议基于boost::asio::deadline_timer的解决方案,这对我来说很好。这是boost::asio环境中的一种异步事件。一个非常重要的事情是,'handler'必须通过与'cancel'相同的'strand_'来序列化,因为在多个线程中使用'boost::asio::deadline_timer'不是线程安全的。
class async_event
{
public:
async_event(
boost::asio::io_service& io_service,
boost::asio::strand<boost::asio::io_context::executor_type>& strand)
: strand_(strand)
, deadline_timer_(io_service, boost::posix_time::ptime(boost::posix_time::pos_infin))
{}
// 'handler' must be serialised through the same 'strand_' as 'cancel' or 'cancel_one'
// because using 'boost::asio::deadline_timer' from multiple threads is not thread safe
template<class WaitHandler>
void async_wait(WaitHandler&& handler) {
deadline_timer_.async_wait(handler);
}
void async_notify_one() {
boost::asio::post(strand_, boost::bind(&async_event::async_notify_one_serialized, this));
}
void async_notify_all() {
boost::asio::post(strand_, boost::bind(&async_event::async_notify_all_serialized, this));
}
private:
void async_notify_one_serialized() {
deadline_timer_.cancel_one();
}
void async_notify_all_serialized() {
deadline_timer_.cancel();
}
boost::asio::strand<boost::asio::io_context::executor_type>& strand_;
boost::asio::deadline_timer deadline_timer_;
};
遗憾的是,Boost ASIO没有async_wait_for_condvar()
方法。
当修改共享资源时,经典的部分同步线程方式如下:
- 锁定保护资源的互斥锁
- 更新任何需要更新的
- 通知一个条件变量,如果等待线程需要进一步处理
- 解锁互斥锁
完全异步的ASIO方式是:
- 生成包含更新资源所需的所有内容的消息
- 发送一个调用更新处理程序的消息到资源链
- 如果需要进一步处理,让更新处理程序创建更多的消息并将它们发布到适当的资源链。
- 如果作业可以在完全私有的数据上执行,那么直接将它们发布到io-context中。
下面是一个类some_shared_resource
的例子,它接收一个字符串state
,并根据接收到的状态触发一些进一步的处理。请注意,私有方法some_shared_resource::receive_state()
中的所有处理都是完全线程安全的,因为链将所有调用序列化。
当然,这个例子是不完整的;some_other_resource
需要一个类似some_shared_ressource::send_state()
的send_code_red()
方法。
#include <boost/asio>
#include <memory>
using asio_context = boost::asio::io_context;
using asio_executor_type = asio_context::executor_type;
using asio_strand = boost::asio::strand<asio_executor_type>;
class some_other_resource;
class some_shared_resource : public std::enable_shared_from_this<some_shared_resource> {
asio_strand strand;
std::shared_ptr<some_other_resource> other;
std::string state;
void receive_state(std::string&& new_state) {
std::string oldstate = std::exchange(state, new_state);
if(state == "red" && oldstate != "red") {
// state transition to "red":
other.send_code_red(true);
} else if(state != "red" && oldstate == "red") {
// state transition from "red":
other.send_code_red(false);
}
}
public:
some_shared_resource(asio_context& ctx, const std::shared_ptr<some_other_resource>& other)
: strand(ctx.get_executor()), other(other) {}
void send_state(std::string&& new_state) {
boost::asio::post(strand, [me = weak_from_this(), new_state = std::move(new_state)]() mutable {
if(auto self = me.lock(); self) {
self->receive_state(std::move(new_state));
}
});
}
};
正如你所看到的,总是在ASIO的链中发布一开始可能有点乏味。但你可以移动其中的大部分"为一个班级配备一个沙滩"。
消息传递的好处是:由于不使用互斥锁,即使在极端情况下也不能再死锁。此外,使用消息传递,通常比使用经典多线程更容易创建高水平的并行性。缺点是,移动和复制所有这些消息对象非常耗时,这会降低应用程序的速度。
最后一点:在send_state()
形成的消息中使用弱指针有助于可靠地销毁some_shared_resource
对象;否则,如果A调用B, B调用C, C调用A(可能只是在超时或类似之后),在消息中使用共享指针而不是弱指针将创建循环引用,从而防止对象销毁。如果您确信永远不会有周期,并且处理来自待删除对象的消息不会造成问题,那么当然可以使用shared_from_this()
而不是weak_from_this()
。如果您确定在ASIO停止之前对象不会被删除(并且所有工作线程都被连接回主线程),那么您也可以直接捕获this
指针。
我使用相当好的可持续性库实现了一个异步互斥锁:
class async_mutex
{
cti::continuable<> tail_{cti::make_ready_continuable()};
std::mutex mutex_;
public:
async_mutex() = default;
async_mutex(const async_mutex&) = delete;
const async_mutex& operator=(const async_mutex&) = delete;
[[nodiscard]] cti::continuable<std::shared_ptr<int>> lock()
{
std::shared_ptr<int> result;
cti::continuable<> tail = cti::make_continuable<void>(
[&result](auto&& promise) {
result = std::shared_ptr<int>((int*)1,
[promise = std::move(promise)](auto) mutable {
promise.set_value();
}
);
}
);
{
std::lock_guard _{mutex_};
std::swap(tail, tail_);
}
co_await std::move(tail);
co_return result;
}
};
使用如:
async_mutex mutex;
...
{
const auto _ = co_await mutex.lock();
// only one lock per mutex-instance
}