从两个不同的线程提升asio-async_write



我希望有一个TCP服务器在一个线程中接收数据,并能够从另一个线程发送数据。

我的代码确实呼应了服务器接收到的内容,因此可以从同一个线程进行编写,但无法从主线程发送消息。

我相信这与插座无法使用有关,但我需要推动才能回到正轨,谢谢!

#include <cstdlib>
#include <iostream>
#include <memory>
#include <utility>
#include <boost/asio.hpp>
using boost::asio::ip::tcp;
class session
: public std::enable_shared_from_this<session>
{
public:
session(tcp::socket socket)
: socket_(std::move(socket))
{
}
void start()
{
do_read();
}
void write(char *msg, std::size_t length)
{
do_write(msg, length);
}
private:
void do_read()
{
auto self(shared_from_this());
socket_.async_read_some(boost::asio::buffer(data_, max_length),
[this, self](boost::system::error_code ec, std::size_t length)
{
if (!ec)
{
do_write(data_, length);
}
});
}
void do_write(char *msg, std::size_t length)
{
auto self(shared_from_this());
boost::asio::async_write(socket_, boost::asio::buffer(msg, length),
[this, self](boost::system::error_code ec, std::size_t length)
{
if (!ec)
{
do_read();
}
else
{
std::cout << ec.message() << std::endl;
std::cout << socket_.is_open() << std::endl;
}
});
}
tcp::socket socket_;
enum { max_length = 1024 };
char data_[max_length];
};
class server
{
public:
server(boost::asio::io_service& io_service, short port)
: acceptor_(io_service, tcp::endpoint(tcp::v4(), port)),
socket_(io_service)
{
do_accept();
}
void send(char *msg, std::size_t length)
{
std::make_shared<session>(std::move(socket_))->write(msg, length);
}
private:
void do_accept()
{
acceptor_.async_accept(socket_,
[this](boost::system::error_code ec)
{
if (!ec)
{
std::make_shared<session>(std::move(socket_))->start();
}
do_accept();
});
}
tcp::acceptor acceptor_;
tcp::socket socket_;
};
int main(int argc, char* argv[])
{
try
{
if (argc != 2)
{
std::cerr << "Usage: async_tcp_server <port>n";
return 1;
}
boost::asio::io_service io_service;
server s(io_service, std::atoi(argv[1]));
std::shared_ptr<boost::asio::io_service::work> work(new boost::asio::io_service::work(io_service));
std::thread t1([&io_service]() {io_service.run();});
char msg[] = { 't', 'e', 's', 't' };
while(1)
{ 
sleep(5);
s.send(msg, 4);
}
t1.join();
std::cout << "t1 joined " << std::endl;
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "n";
}
return 0;
} 

首先需要一些同步。我建议把每个客户放在一条线上:

void do_accept() {
acceptor_.async_accept(make_strand(acceptor_.get_executor()),
[this](error_code ec, tcp::socket s) {
if (!ec) {
std::make_shared<session>(std::move(s))->start();
}
do_accept();
});
}

接下来,您需要修改write()成员函数以发布到链:

void write(char const* msg, size_t length)
{
post(socket_.get_executor(), [this, =] {
do_write(msg, length); });
}

请注意,这还不够。async_write是一个组合操作,不能重叠写入:

此操作是根据对流的async_write_some函数的零个或多个调用来实现的,称为组合操作。程序必须确保流在该操作完成之前不执行其他写入操作(如async_write、流的async_wwrite_some函数或任何其他执行写入的组合操作(。

这里的常见方法是创建一个队列:

std::deque<std::string> outbox_;
void do_write(std::string msg) {
auto self(shared_from_this());
outbox_.push_back(std::move(msg));
if (outbox_.size() == 1)
do_write_loop();
}

队列提供了引用稳定性(对于保持缓冲区的有效性很重要(。write_loop只在排队的第一个元素处启动,并保持写入直到队列为空:

void do_write_loop() {
if (outbox_.empty())
return;

async_write(socket_, boost::asio::buffer(outbox_.front()),
[this, self](error_code ec, size_t length) {
if (!ec) {
outbox_.pop_front();
do_write_loop();
} else {
std::cout << ec.message() << std::endl;
std::cout << socket_.is_open() << std::endl;
}
});
}

现在,你的server::write有点好笑。它创建了一个新的会话,从socket_变量中再次移动(我在前面添加链的过程中删除了它(。这样做永远不会有任何用处。我猜您可能期望它发送给所有连接的会话。在这种情况下:

void send(char const* msg, size_t length) const
{
for (Handle const& handle: _sessions)
if (auto const sess = handle.lock())
sess->write(msg, length);
}

当然,你需要一些东西来包含会话:

private:
using Handle = std::weak_ptr<session>;
std::list<Handle> _sessions;

并将会话存储在do_accept:中

auto sess = std::make_shared<session>(std::move(s));
_sessions.emplace_back(sess);
sess->start();

您可能需要清理断开连接的会话(可选(:

_sessions.remove_if(std::mem_fn(&Handle::expired));

现在,我们还没有完成,因为就像session一样,server需要一条链来确保所有这些操作的线程安全(相对于sessions_(:

完整列表

在Coliru上直播

#include <boost/asio.hpp>
#include <cstdlib>
#include <deque>
#include <iostream>
#include <list>
#include <memory>
#include <utility>
using boost::asio::ip::tcp;
using boost::system::error_code;
using std::this_thread::sleep_for;
using namespace std::chrono_literals;
class session : public std::enable_shared_from_this<session>
{
public:
session(tcp::socket socket)
: socket_(std::move(socket))
{
}
void start()
{
do_read();
}
void write(char const* msg, size_t length)
{
auto self(shared_from_this());
post(socket_.get_executor(),
[this, msg = std::string(msg, length)]() mutable { do_write(std::move(msg)); });
}
private:
void do_read()
{
auto self(shared_from_this());
socket_.async_read_some(boost::asio::buffer(data_), [this, self](error_code ec, size_t length) {
if (!ec)
{
write(data_.data(), length);
do_read();
}
});
}
std::deque<std::string> outbox_;
void do_write(std::string msg)
{
auto self(shared_from_this());
outbox_.push_back(std::move(msg));
if (outbox_.size() == 1)
do_write_loop();
}
void do_write_loop()
{
if (outbox_.empty())
return;
auto self(shared_from_this());
async_write(
socket_, boost::asio::buffer(outbox_.front()), [this, self](error_code ec, size_t length) {
if (!ec)
{
outbox_.pop_front();
do_write_loop();
}
else
{
std::cout << ec.message() << std::endl;
std::cout << socket_.is_open() << std::endl;
}
});
}
tcp::socket socket_;
std::array<char, 1024> data_;
};
class server
{
public:
server(boost::asio::io_service& io_service, short port)
: acceptor_(make_strand(io_service), tcp::endpoint(tcp::v4(), port))
{
acceptor_.listen();
do_accept();
}
void send(char const* msg, size_t length)
{
post(acceptor_.get_executor(), [this, msg = std::string(msg, length)]() mutable {
for (Handle const& handle : _sessions)
if (auto const sess = handle.lock())
sess->write(msg.data(), msg.size());
});
}
private:
using Handle = std::weak_ptr<session>;
std::list<Handle> _sessions;
void do_accept()
{
acceptor_.async_accept(
make_strand(acceptor_.get_executor()), [this](error_code ec, tcp::socket s) {
if (!ec)
{
auto sess = std::make_shared<session>(std::move(s));
_sessions.emplace_back(sess);
sess->start();
_sessions.remove_if(std::mem_fn(&Handle::expired));
}
do_accept();
});
}
tcp::acceptor acceptor_;
};
int main(int argc, char* argv[])
{
try
{
if (argc != 2)
{
std::cerr << "Usage: async_tcp_server <port>" << std::endl;
return 1;
}
boost::asio::io_service io_service;
server s(io_service, std::atoi(argv[1]));
auto work = make_work_guard(io_service);
std::thread t1([&io_service]() { io_service.run(); });
char constexpr msg[] = {'t', 'e', 's', 't'};
while (true)
{
sleep_for(5s);
s.send(msg, 4);
}
t1.join();
std::cout << "t1 joined " << std::endl;
}
catch (std::exception const& e)
{
std::cerr << "Exception: " << e.what() << std::endl;
}
}

最新更新