异步等待一个条件变量



是否可以对boost::asio中的条件变量执行异步等待(读取:非阻塞)?如果不直接支持,请提供实现它的提示。

我可以实现一个计时器,甚至每隔几毫秒触发一次唤醒,但这种方法非常低劣,我发现很难相信没有实现/记录条件变量同步。

如果我理解的意图正确,你想启动一个事件处理程序,当一些条件变量是信号,在一个线程池的上下文中?我认为等待处理程序开始时的条件变量,以及io_service::post()本身最终回到池中就足够了,类似于:

#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
boost::asio::io_service io;
boost::mutex mx;
boost::condition_variable cv;
void handler()
{
    boost::unique_lock<boost::mutex> lk(mx);
         cv.wait(lk);
    std::cout << "handler awakenedn";
    io.post(handler);
}
void buzzer()
{
    for(;;)
    {
        boost::this_thread::sleep(boost::posix_time::seconds(1));
        boost::lock_guard<boost::mutex> lk(mx);
            cv.notify_all();
    }
}
int main()
{
    io.post(handler);
    boost::thread bt(buzzer);
    io.run();
}

我可以建议基于boost::asio::deadline_timer的解决方案,这对我来说很好。这是boost::asio环境中的一种异步事件。一个非常重要的事情是,'handler'必须通过与'cancel'相同的'strand_'来序列化,因为在多个线程中使用'boost::asio::deadline_timer'不是线程安全的。

class async_event
{
public:
    async_event(
        boost::asio::io_service& io_service,
        boost::asio::strand<boost::asio::io_context::executor_type>& strand)
            : strand_(strand)
            , deadline_timer_(io_service, boost::posix_time::ptime(boost::posix_time::pos_infin))
    {}
    // 'handler' must be serialised through the same 'strand_' as 'cancel' or 'cancel_one'
    //  because using 'boost::asio::deadline_timer' from multiple threads is not thread safe
    template<class WaitHandler>
    void async_wait(WaitHandler&& handler) {
        deadline_timer_.async_wait(handler);
    }
    void async_notify_one() {
        boost::asio::post(strand_, boost::bind(&async_event::async_notify_one_serialized, this));
    }
    void async_notify_all() {
        boost::asio::post(strand_, boost::bind(&async_event::async_notify_all_serialized, this));
    }
private:
    void async_notify_one_serialized() {
        deadline_timer_.cancel_one();
    }
    void async_notify_all_serialized() {
        deadline_timer_.cancel();
    }
    boost::asio::strand<boost::asio::io_context::executor_type>& strand_;
    boost::asio::deadline_timer deadline_timer_;
};

遗憾的是,Boost ASIO没有async_wait_for_condvar()方法。

在大多数情况下,您也不需要它。用ASIO方式编程通常意味着使用链(而不是互斥锁或条件变量)来保护共享资源。除了一些罕见的情况(通常是在启动和退出时关注正确的构造或销毁顺序),您根本不需要互斥锁或条件变量。

当修改共享资源时,经典的部分同步线程方式如下:

  • 锁定保护资源的互斥锁
  • 更新任何需要更新的
  • 通知一个条件变量,如果等待线程需要进一步处理
  • 解锁互斥锁

完全异步的ASIO方式是:

  • 生成包含更新资源所需的所有内容的消息
  • 发送一个调用更新处理程序的消息到资源链
  • 如果需要进一步处理,让更新处理程序创建更多的消息并将它们发布到适当的资源链。
  • 如果作业可以在完全私有的数据上执行,那么直接将它们发布到io-context中。

下面是一个类some_shared_resource的例子,它接收一个字符串state,并根据接收到的状态触发一些进一步的处理。请注意,私有方法some_shared_resource::receive_state()中的所有处理都是完全线程安全的,因为链将所有调用序列化。

当然,这个例子是不完整的;some_other_resource需要一个类似some_shared_ressource::send_state()send_code_red()方法。

#include <boost/asio>
#include <memory>
using asio_context = boost::asio::io_context;
using asio_executor_type = asio_context::executor_type;
using asio_strand = boost::asio::strand<asio_executor_type>;
class some_other_resource;
class some_shared_resource : public std::enable_shared_from_this<some_shared_resource> {
    asio_strand strand;
    std::shared_ptr<some_other_resource> other;
    std::string state;
    void receive_state(std::string&& new_state) {
        std::string oldstate = std::exchange(state, new_state);
        if(state == "red" && oldstate != "red") {
            // state transition to "red":
            other.send_code_red(true);
        } else if(state != "red" && oldstate == "red") {
            // state transition from "red":
            other.send_code_red(false);
        }
    }
public:
    some_shared_resource(asio_context& ctx, const std::shared_ptr<some_other_resource>& other)
      : strand(ctx.get_executor()), other(other) {}
    void send_state(std::string&& new_state) {
        boost::asio::post(strand, [me = weak_from_this(), new_state = std::move(new_state)]() mutable {
            if(auto self = me.lock(); self) {
                self->receive_state(std::move(new_state));
            }
        });
    }
};

正如你所看到的,总是在ASIO的链中发布一开始可能有点乏味。但你可以移动其中的大部分"为一个班级配备一个沙滩"。

消息传递的好处是:由于不使用互斥锁,即使在极端情况下也不能再死锁。此外,使用消息传递,通常比使用经典多线程更容易创建高水平的并行性。缺点是,移动和复制所有这些消息对象非常耗时,这会降低应用程序的速度。

最后一点:在send_state()形成的消息中使用弱指针有助于可靠地销毁some_shared_resource对象;否则,如果A调用B, B调用C, C调用A(可能只是在超时或类似之后),在消息中使用共享指针而不是弱指针将创建循环引用,从而防止对象销毁。如果您确信永远不会有周期,并且处理来自待删除对象的消息不会造成问题,那么当然可以使用shared_from_this()而不是weak_from_this()。如果您确定在ASIO停止之前对象不会被删除(并且所有工作线程都被连接回主线程),那么您也可以直接捕获this指针。

我使用相当好的可持续性库实现了一个异步互斥锁:

class async_mutex
{
    cti::continuable<> tail_{cti::make_ready_continuable()};
    std::mutex mutex_;
public:
    async_mutex() = default;
    async_mutex(const async_mutex&) = delete;
    const async_mutex& operator=(const async_mutex&) = delete;
    [[nodiscard]] cti::continuable<std::shared_ptr<int>> lock()
    {
        std::shared_ptr<int> result;
        cti::continuable<> tail = cti::make_continuable<void>(
            [&result](auto&& promise) {
                result = std::shared_ptr<int>((int*)1,
                    [promise = std::move(promise)](auto) mutable {
                        promise.set_value();
                    }
                );
            }
        );
        {
            std::lock_guard _{mutex_};
            std::swap(tail, tail_);
        }
        co_await std::move(tail);
        co_return result;
    }
};

使用如:

async_mutex mutex;
...
{
    const auto _ = co_await mutex.lock();
    // only one lock per mutex-instance
}

相关内容

  • 没有找到相关文章

最新更新