如何避免与"asio::ip::tcp::iostream"的数据竞争?



我的问题

使用两个线程通过asio::ip::tcp::iostream发送和接收时如何避免数据竞争?

设计

我正在编写一个使用asio::ip::tcp::iostream进行输入和输出的程序。 程序通过端口 5555 接受来自(远程)用户的命令,并通过同一 TCP 连接向用户发送消息。 由于这些事件(从用户接收的命令或发送给用户的消息)异步发生,因此我有单独的传输和接收线程。

在这个玩具版本中,命令是"一","二"和"退出"。 当然,"退出"会退出该程序。 其他命令不执行任何操作,任何无法识别的命令都会导致服务器关闭 TCP 连接。

传输的消息是简单的序列号消息,每秒发送一次。

在这个玩具版本和我尝试编写的真实代码中,传输和接收过程都使用阻塞IO,因此似乎没有使用std::mutex或其他同步机制的好方法。 (在我的尝试中,一个进程会抓取互斥锁然后阻止,这对此不起作用。

构建和测试

为了构建和测试它,我在 64 位 Linux 机器上使用 gcc 版本 7.2.1 和 valgrind 3.13。 建:

g++ -DASIO_STANDALONE -Wall -Wextra -pedantic -std=c++14 concurrent.cpp -o concurrent -lpthread

为了进行测试,我使用以下命令运行服务器:

valgrind --tool=helgrind --log-file=helgrind.txt ./concurrent 

然后,我在另一个窗口中使用telnet 127.0.0.1 5555来创建与服务器的连接。helgrind正确地指出的是,存在数据竞争,因为runTxrunRx都试图异步访问同一流:

==16188== 线程 #1 在0x1FFEFFF1CC读取大小 1 期间可能的数据争用

==16188== 持锁数:无

。省略了更多行

并发.cpp

#include <asio.hpp>
#include <iostream>
#include <fstream>
#include <thread>
#include <array>
#include <chrono>
class Console {
public:
Console() :
want_quit{false},
want_reset{false}
{}
bool getQuitValue() const { return want_quit; }
int run(std::istream *in, std::ostream *out);
bool wantReset() const { return want_reset; }
private:
int runTx(std::istream *in);
int runRx(std::ostream *out);
bool want_quit;
bool want_reset;
};
int Console::runTx(std::istream *in) {
static const std::array<std::string, 3> cmds{
"quit", "one", "two", 
};
std::string command;
while (!want_quit && !want_reset && *in >> command) {
if (command == cmds.front()) {
want_quit = true;
}
if (std::find(cmds.cbegin(), cmds.cend(), command) == cmds.cend()) {
want_reset = true;
std::cout << "unknown command [" << command << "]n";
} else {
std::cout << command << 'n';
}
}
return 0;
}
int Console::runRx(std::ostream *out) {
for (int i=0; !(want_reset || want_quit); ++i) {
(*out) << "This is message number " << i << 'n';
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
out->flush();
}
return 0;
}
int Console::run(std::istream *in, std::ostream *out) {
want_reset = false;
std::thread t1{&Console::runRx, this, out};
int status = runTx(in);
t1.join();
return status;
}
int main()
{
Console con;
asio::io_service ios;
// IPv4 address, port 5555
asio::ip::tcp::acceptor acceptor(ios, 
asio::ip::tcp::endpoint{asio::ip::tcp::v4(), 5555});
while (!con.getQuitValue()) {
asio::ip::tcp::iostream stream;
acceptor.accept(*stream.rdbuf());
con.run(&stream, &stream);
if (con.wantReset()) {
std::cout << "resettingn";
}
}
}

是的,您正在共享流底层的套接字,而不同步

旁注,与布尔标志相同,可以通过更改轻松"修复":

std::atomic_bool want_quit;
std::atomic_bool want_reset;

如何解决

老实说,我认为没有好的解决方案。你自己说过:操作是异步的,所以如果你尝试同步执行它们,你会有麻烦。

你可以试着想一下黑客。如果我们基于相同的底层套接字(文件描述符)创建一个单独的流对象会怎样?这并不容易,因为这样的流不是 Asio 的一部分。

但是我们可以使用 Boost Iostreams 破解一个:

#define BOOST_IOSTREAMS_USE_DEPRECATED
#include <boost/iostreams/device/file_descriptor.hpp>
#include <boost/iostreams/stream.hpp>
// .... later:
// HACK: procure a _separate `ostream` to prevent the race, using the same fd
namespace bio = boost::iostreams;
bio::file_descriptor_sink fds(stream.rdbuf()->native_handle(), false); // close_on_exit flag is deprecated
bio::stream<bio::file_descriptor_sink> hack_ostream(fds);
con.run(stream, hack_ostream);

事实上,这在没有争用的情况下运行(在同一套接字上同时读取和写入是可以的,只要您不共享包装它们的非线程安全 Asio 对象)。

我推荐的是:

不要那样做。这是一个笨蛋。你正在使事情复杂化,显然是为了避免使用异步代码。我会咬紧牙关。

从服务逻辑中考虑 IO 机制并不需要太多工作。你最终将不受随机限制(你可以考虑处理多个客户端,你可以完全没有任何线程等)。

如果您想了解一些中间立场,请查看堆叠协程 (http://www.boost.org/doc/libs/1_66_0/doc/html/boost_asio/reference/spawn.html)

清单

仅供参考

注意 我进行了重构以消除对指针的需求。您不会转让所有权,因此引用就可以了。如果您不知道如何将引用传递给bind/std::thread构造函数,诀窍就在您将看到的std::ref中。

[对于压力测试,我大大减少了延迟。

住在科里鲁

#include <boost/asio.hpp>
#include <iostream>
#include <fstream>
#include <thread>
#include <array>
#include <chrono>
class Console {
public:
Console() :
want_quit{false},
want_reset{false}
{}
bool getQuitValue() const { return want_quit; }
int run(std::istream &in, std::ostream &out);
bool wantReset() const { return want_reset; }
private:
int runTx(std::istream &in);
int runRx(std::ostream &out);
std::atomic_bool want_quit;
std::atomic_bool want_reset;
};
int Console::runTx(std::istream &in) {
static const std::array<std::string, 3> cmds{
{"quit", "one", "two"}, 
};
std::string command;
while (!want_quit && !want_reset && in >> command) {
if (command == cmds.front()) {
want_quit = true;
}
if (std::find(cmds.cbegin(), cmds.cend(), command) == cmds.cend()) {
want_reset = true;
std::cout << "unknown command [" << command << "]n";
} else {
std::cout << command << 'n';
}
}
return 0;
}
int Console::runRx(std::ostream &out) {
for (int i=0; !(want_reset || want_quit); ++i) {
out << "This is message number " << i << 'n';
std::this_thread::sleep_for(std::chrono::milliseconds(1));
out.flush();
}
return 0;
}
int Console::run(std::istream &in, std::ostream &out) {
want_reset = false;
std::thread t1{&Console::runRx, this, std::ref(out)};
int status = runTx(in);
t1.join();
return status;
}
#define BOOST_IOSTREAMS_USE_DEPRECATED
#include <boost/iostreams/device/file_descriptor.hpp>
#include <boost/iostreams/stream.hpp>
int main()
{
Console con;
boost::asio::io_service ios;
// IPv4 address, port 5555
boost::asio::ip::tcp::acceptor acceptor(ios, boost::asio::ip::tcp::endpoint{boost::asio::ip::tcp::v4(), 5555});
while (!con.getQuitValue()) {
boost::asio::ip::tcp::iostream stream;
acceptor.accept(*stream.rdbuf());
{
// HACK: procure a _separate `ostream` to prevent the race, using the same fd
namespace bio = boost::iostreams;
bio::file_descriptor_sink fds(stream.rdbuf()->native_handle(), false); // close_on_exit flag is deprecated
bio::stream<bio::file_descriptor_sink> hack_ostream(fds);
con.run(stream, hack_ostream);
}
if (con.wantReset()) {
std::cout << "resettingn";
}
}
}

测试:

netcat localhost 5555 <<<quit
This is message number 0
This is message number 1
This is message number 2

commands=( one two one two one two one two one two one two one two three )
while sleep 0.1; do echo ${commands[$(($RANDOM%${#commands}))]}; done | (while netcat localhost 5555; do sleep 1; done)

无限期运行,偶尔重置连接(当命令"三"已发送时)。

最新更新