如何避免并发回调用户定义例程?



我试图修改一些Boost代码,使其与Autoit兼容。原始项目可以在这里找到。我的版本可以在这里找到。我可以使用一些帮助来确定如何防止多个并发回调到用户提供的Autoit例程。

这里是现有的on_read回调——

/// Callback registered by async_read. It calls user registered callback to actually process the data. And then issue another async_read to wait for data from server again.
/// param ec instance of error code
/// param bytes_transferred
void
on_read(
beast::error_code ec,
std::size_t bytes_transferred) {
if(EnableVerbose)
{
boost::lock_guard<boost::mutex> guard(mtx_);
std::wcout << L"<WsDll-" ARCH_LABEL "> in on read" << std::endl;
}       
boost::ignore_unused(bytes_transferred);
{
boost::lock_guard<boost::mutex> guard(mtx_);
if(!Is_Connected) {
return;
}
}
// error occurs
if (ec) {
if(on_fail_cb)
on_fail_cb(L"read");
return fail(ec, L"read");
}
const std::string data = beast::buffers_to_string(buffer_.data());
const std::wstring wdata(data.begin(), data.end());
if(EnableVerbose)
{
boost::lock_guard<boost::mutex> guard(mtx_);
std::wcout << L"<WsDll-" ARCH_LABEL "> received[" << bytes_transferred << L"] " << wdata << std::endl;
}       
//  The next section is where my issue resides
if (on_data_cb)
on_data_cb(wdata.c_str(), wdata.length());
buffer_.consume(buffer_.size());
if(EnableVerbose)
{
boost::lock_guard<boost::mutex> guard(mtx_);
std::wcout << L"<WsDll-" ARCH_LABEL "> issue new async_read in on_read" << std::endl;
}       
ws_.async_read(
buffer_,
beast::bind_front_handler(
&session::on_read,
shared_from_this()));
// Close the WebSocket connection
// ws_.async_close(websocket::close_code::normal,
//     beast::bind_front_handler(
//         &session::on_close,
//         shared_from_this()));
}

代码if (on_data_cb) on_data_cb(wdata.c_str(), wdata.length());执行回调到Autoit,我需要知道如何防止它一次执行多次。我不是很精通c++/Boost,所以请温柔。: -)

比较温和的回答是参考文档:strings:使用没有显式锁定的线程

实际上你没有展示足够的代码。例如,我们无法知道

  • 正在使用的执行上下文。如果您正在使用io_context与单个服务线程run()-它,您已经具有隐式链并保证没有处理程序同时运行

  • IO对象绑定到哪个执行器。在您的代码中,唯一可见的对象是ws_,我们假设它类似于

    net::io_context                ctx_;
    websocket::stream<tcp::socket> ws_{ctx_};
    

    现在,如果你想有多个线程服务ctx_,你可以将ws_绑定到一个链执行器:

    websocket::stream<tcp::socket> ws_{make_strand(ctx_)};
    

    现在,只要你确保你自己的访问(例如async_初始化)在正确的链上,你的代码就已经安全了。如果您想要—并且您不介意硬编码执行器类型,您可以这样断言:

    auto strand = ws_get_executor ().targetnet::strand();断言(链,,链→running_in_this_thread ());

专业提示:

如果您确实提交到一个特定的执行器类型,请考虑静态绑定该类型:

using Context  = net::io_context::executor_type;
using Executor = net::io_context::executor_type;
using Strand   = net::strand<net::io_context::executor_type>;
using Socket   = net::basic_stream_socket<tcp, Strand>;
Context                   ctx_;
websocket::stream<Socket> ws_{make_strand(ctx_)};

这避免了类型擦除执行器的开销,并且您可以简化断言:

assert(ws_.get_executor().running_in_this_thread());

  • 使用atomic_bool代替锁定一个完整的互斥锁来检查布尔值。

  • 使用标准库功能(std::mutex,std::lock_guard),使更容易更好

  • 考虑构建ANSI构建,或检查您的扩展方法:

    const std::wstring wdata(data.begin(), data.end());
    

    是一个反模式。查看此处https://en.cppreference.com/w/cpp/string/multibyte或此处获取更多信息https://en.cppreference.com/w/cpp/locale/ctype/widen

演示的"live"代码:

Live On Coliru

#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <iostream>
namespace net       = boost::asio;
namespace beast     = boost::beast;
namespace websocket = beast::websocket;
using net::ip::tcp;
static std::mutex s_consoleMtx;
static void fail(beast::error_code ec, std::string txt) {
std::cerr << txt << ": " << ec.message() << " at " << ec.location() << std::endl;
}
#define ARCH_LABEL "STACKO"
struct session : std::enable_shared_from_this<session> {
using Context  = net::io_context::executor_type;
using Executor = net::io_context::executor_type;
using Strand   = net::strand<net::io_context::executor_type>;
using Socket   = net::basic_stream_socket<tcp, Strand>;
Context                   ctx_;
websocket::stream<Socket> ws_{make_strand(ctx_)};
static bool const  EnableVerbose = true;
std::atomic_bool   Is_Connected  = false;
beast::flat_buffer buffer_;
std::function<void(std::string)>         on_fail_cb;
std::function<void(char const*, size_t)> on_data_cb;
/// Callback registered by async_read. It calls user registered
/// callback to actually process the data. And then issue another
/// async_read to wait for data from server again. 
/// param ec instance of error code 
/// param bytes_transferred
void on_read(beast::error_code ec, [[maybe_unused]] size_t bytes_transferred) {
if (EnableVerbose) {
std::lock_guard<std::mutex> guard(s_consoleMtx);
std::cout << "<WsDll-" ARCH_LABEL "> in on read" << std::endl;
}
if (!Is_Connected)
return;
// error occurs
if (ec) {
if (on_fail_cb)
on_fail_cb("read");
return fail(ec, "read");
}
std::string const data = beast::buffers_to_string(buffer_.data());
if (EnableVerbose) {
std::lock_guard<std::mutex> guard(s_consoleMtx);
std::cout << "<WsDll-" ARCH_LABEL "> received[" << bytes_transferred << "] " << data << std::endl;
}
if (on_data_cb)
on_data_cb(data.c_str(), data.length());
buffer_.consume(buffer_.size());
if (EnableVerbose) {
std::lock_guard<std::mutex> guard(s_consoleMtx);
std::cout << "<WsDll-" ARCH_LABEL "> issue new async_read in on_read" << std::endl;
}
assert(ws_.get_executor().running_in_this_thread());
ws_.async_read(buffer_, beast::bind_front_handler(&session::on_read, shared_from_this()));
}
};

相关内容

  • 没有找到相关文章

最新更新