我正在尝试通过TCP套接字发送结构。下面的代码在构造对象时netcom_server()
函数中崩溃boost::archive::text_iarchive
错误如下。如果您还能找到任何其他缺陷,请查看。 我正在运行此代码的两个不同实例 - 一个运行netcom_server()
函数,另一个运行netcom_client()
。
接受的连接:
成功,接收字节数 32
终止 抛出"boost::archive::archive_exception"的实例后调用
what((:输入流错误已中止(核心已转储(
typedef struct elem{
bool bit;
int count;
uint64_t hashSum[2];
uint64_t idSum;
template <typename Archive>
void serialize(Archive &ar, const unsigned int version){
ar & bit;
ar & count;
ar & hashSum[0];ar & hashSum[1];
ar & idSum;
}
}element;
typedef boost::error_info<struct tag_errmsg, std::string> errmsg_info;
void netcom_server(int portn, bloom_filter& bloom_db){
io_service io_service;
element recv_elem;
std::string bufdata;
boost::asio::mutable_buffer rcv_buf;
boost::system::error_code ec;
bloom_filter recvd_bloomdb(4,"rcvbloom_db",4096);
unsigned int i;
ip::tcp::socket server_socket(io_service);
tcp::acceptor acceptor_server(io_service,tcp::endpoint(tcp::v4(), portn));
acceptor_server.accept(server_socket);
cout<<"Connection Accepted : "<<endl;
for(i=0; i<recvd_bloomdb.size(); ++i){
bufdata.resize(sizeof(element));
rcv_buf = boost::asio::buffer(bufdata,sizeof(element));
getData(server_socket,rcv_buf);
try{
std::istringstream is(bufdata);
boost::archive::text_iarchive in_archive {is};
in_archive >> recv_elem;
}
catch(std::exception &e){
std::cout << e.what();
}
recvd_bloomdb.indexins_elem(recv_elem,i);
}
cout<<"Received "<<i<<" elements at server"<<endl<<std::flush;
recvd_bloomdb.show_elemvec();
while(1) sleep(5);
}
void netcom_client(int portn, string serverip, bloom_filter& bloom_db){
io_service io_service;
ip::tcp::socket client_socket(io_service);
boost::asio::const_buffer snd_buf;
boost::system::error_code ec;
element snd_elem;
string bufdata;
std::stringstream os;
boost::archive::text_oarchive out_archive {os};
unsigned int i;
client_socket.connect(tcp::endpoint(address::from_string(serverip),portn));
for(i=0; i< bloom_db.size(); ++i){
::set_elemvec(snd_elem,bloom_db.getelem(i));
out_archive << snd_elem;
bufdata = os.str();
snd_buf = boost::asio::buffer(bufdata, sizeof(element));
bloom_db.show_elem(i);
sendData(client_socket,snd_buf);
}
cout<<"Sent "<<i<<" elements from client"<<endl<<std::flush;
}
void sendData(tcp::socket& socket, boost::asio::const_buffer& data)
{
boost::system::error_code ec;
std::size_t bytes_transferred =boost::asio::write(socket,data,ec);
if (bytes_transferred == 0 && ec == boost::asio::error::would_block)
{
std::cout << " Could not send any more" << std::endl;
}
std::cout << ec.message() << ", bytes sent "<<bytes_transferred<<std::endl;
}
void getData(tcp::socket& socket, boost::asio::mutable_buffer& data)
{
boost::system::error_code ec;
std::size_t bytes_transferred = boost::asio::read(socket, data,ec);
if (bytes_transferred == 0 && ec == boost::asio::error::would_block)
{
std::cout << "No data available" << std::endl;
}
std::cout << ec.message() << ", bytes receieved "<<bytes_transferred<<std::endl;
}
从 gdb 添加流布夫数据
(gdb) p is._M_streambuf
$4 = (std::basic_streambuf<char, std::char_traits<char> > *) 0x7ffd1e567050
(gdb) p *is._M_streambuf
$5 = {
_vptr.basic_streambuf = 0x7f3774a37618 <vtable for std::__cxx11::basic_stringbuf<char, std::char_traits<char>, std::allocator<char> >+16>,
_M_in_beg = 0x557e228c4600 "22 serialization::archive 17 0 0",
_M_in_cur = 0x557e228c4620 "",
_M_in_end = 0x557e228c4620 "",
_M_out_beg = 0x0,
_M_out_cur = 0x0,
_M_out_end = 0x0,
_M_buf_locale = {
static none = 0,
static ctype = 1,
static numeric = 2,
static collate = 4,
static time = 8,
static monetary = 16,
static messages = 32,
static all = 63,
_M_impl = 0x557e228c6c40,
static _S_classic = 0x7f3774a3ec20 <(anonymous namespace)::c_locale_impl>,
static _S_global = 0x7f3774a3ec20 <(anonymous namespace)::c_locale_impl>,
static _S_categories = 0x7f3774a30760 <__gnu_cxx::category_names>,
static _S_once = 2,
static _S_twinned_facets = 0x7f3774a31fe0 <std::locale::_S_twinned_facets>
}
}
几个问题。
-
缓冲区的大小不合适。就像评论者所说,
sizeof(element)
与序列化存档格式无关(这将更长,例如- 提高序列化C++开销
- 如何抑制 Boost 序列化::存档中的额外信息?
-
将其更改为仅
buffer(bufdata)
不起作用,因为它只是一个零长度缓冲区(std::string
默认构造为空字符串(。考虑调整大小到适当的长度或使用boost::asio::dynamic_buffer
(如果您的 Boost 版本足够新( -
字符串流和out_archive的生存期太长,导致流可能不完整。在使用结果之前,请确保流已刷新并完成存档:
std::string bufdata; { std::stringstream os; { boost::archive::text_oarchive out_archive{ os }; out_archive << snd_elem; } bufdata = os.str(); }
-
该行
recvd_bloomdb.indexins_elem(recv_elem, i);
看起来很可疑:看起来您在迭代时正在修改
recvd_bloomdb
(这可能会导致无限循环/UB,具体取决于indexins_elem
应该工作的方式( -
事实上,
bloom_filter
的各种副本似乎并不清楚(服务器代码中甚至没有使用bloom_db
参数(。如果要序列化整个数据库,为什么不使bloom_filter
本身可序列化呢?
如果您想 在同一连接上发送多个存档,则缺少消息框架。您可以决定使用分隔符。对于文本存档,NUL 字符无效,因此可以使用:
bufdata += 'x00'; sendData(client_socket, boost::asio::buffer(bufdata));
在接收端:
std::size_t bytes_transferred = boost::asio::read_until(socket, data, 'x00', ec);
修正
以下是解决/修复上述问题的代码:
住在科里鲁
#include <boost/archive/text_iarchive.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/asio.hpp>
#include <boost/serialization/serialization.hpp>
#include <iostream>
#include <mutex>
namespace {
// very sumple hack to synchronize output to std::cout
#define PRINT(e) { std::lock_guard<std::mutex> lk(s_mx); std::cout << e << std::endl; }
static std::mutex s_mx;
}
namespace ba = boost::asio;
using ba::ip::tcp;
using string_buffer = boost::asio::dynamic_string_buffer<char, std::char_traits<char>, std::allocator<char> >;
void sendData(tcp::socket& socket, boost::asio::const_buffer data);
void getData(tcp::socket& socket, string_buffer data);
typedef struct elem {
bool bit = false;
int count = 0;
uint64_t hashSum[2] = {};
uint64_t idSum = 0;
template <typename Archive> void serialize(Archive& ar, unsigned) {
ar& bit;
ar& count;
ar& hashSum[0];
ar& hashSum[1];
ar& idSum;
}
friend std::ostream& operator<<(std::ostream& os, elem const& e) {
std::ostringstream oss;
oss << std::showbase << std::boolalpha;
oss << "[" << e.bit << ","
<< std::dec << e.count << ","
<< std::hex << "{" << e.hashSum[0] << "," << e.hashSum[1] << "},"
<< e.idSum << "]";
return os << oss.str();
}
} element;
namespace mocks {
struct bloom_filter {
std::vector<elem> mock_data{
{ false, 1, { 0x7e, 0xeb }, 42 },
};
bloom_filter(...) {}
size_t size() const { return mock_data.size(); }
void indexins_elem(elem const& e, size_t i) {
assert(i < size());
mock_data.at(i) = e;
//Danger: this grows the db while it is being iterated
//mock_data.insert(mock_data.begin() + i, e);
}
void show_elemvec() const {
for (auto& el : mock_data) {
PRINT(el);
}
}
void show_elem(size_t i) const { PRINT(mock_data.at(i)); }
elem getelem(size_t i) { return mock_data.at(i); }
};
void set_elemvec(element& lhs, element const& rhs) { lhs = rhs; }
}
using mocks::bloom_filter;
using mocks::set_elemvec;
typedef boost::error_info<struct tag_errmsg, std::string> errmsg_info;
void netcom_server(unsigned short portn, bloom_filter& /*bloom_db*/) {
ba::io_service io_service;
bloom_filter recvd_bloomdb(4, "rcvbloom_db", 4096);
tcp::socket server_socket(io_service);
tcp::acceptor acceptor_server(io_service, {tcp::v4(), portn});
while (1) {
acceptor_server.accept(server_socket);
PRINT("Connection Accepted : " << server_socket.remote_endpoint());
for (auto i = 0u; i < recvd_bloomdb.size(); ++i) {
std::string bufdata;
getData(server_socket, ba::dynamic_buffer(bufdata));
element recv_elem;
try {
while (bufdata.size() && ('x00' == bufdata.back())) {
bufdata.pop_back();
}
std::istringstream is(bufdata);
PRINT("RECV DEBUG: " << std::quoted(is.str()));
{
boost::archive::text_iarchive in_archive{ is };
in_archive >> recv_elem;
}
recvd_bloomdb.indexins_elem(recv_elem, i);
} catch (std::exception& e) {
PRINT(e.what());
}
}
PRINT("Received " << recvd_bloomdb.size() << " elements at server");
recvd_bloomdb.show_elemvec();
PRINT("Close connection " << server_socket.remote_endpoint());
server_socket.close();
}
}
std::string archive_text(elem const& e) {
std::stringstream os;
{
boost::archive::text_oarchive out_archive{ os };
out_archive << e;
}
return os.str();
}
void netcom_client(unsigned short portn, std::string serverip, bloom_filter& bloom_db) {
ba::io_service io_service;
tcp::socket client_socket(io_service);
client_socket.connect({ba::ip::address::from_string(serverip), portn});
std::this_thread::sleep_for(std::chrono::seconds(1));
for (auto i = 0u; i < bloom_db.size(); ++i) {
bloom_db.show_elem(i);
auto bufdata = archive_text(bloom_db.getelem(i));
PRINT("SEND DEBUG: " << std::quoted(bufdata));
bufdata += 'x00';
sendData(client_socket, boost::asio::buffer(bufdata));
}
PRINT("Exit netcom_client");
}
void sendData(tcp::socket& socket, boost::asio::const_buffer data) {
boost::system::error_code ec;
std::size_t bytes_transferred = boost::asio::write(socket, data, ec);
if (bytes_transferred == 0 && ec == boost::asio::error::would_block) {
PRINT(" Could not send any more");
}
PRINT(ec.message() << "bytes sent " << bytes_transferred);
}
void getData(tcp::socket& socket, string_buffer data) {
boost::system::error_code ec;
std::size_t bytes_transferred = boost::asio::read_until(socket, data, 'x00', ec);
//std::size_t bytes_transferred = boost::asio::read(socket, data, ec);
if (bytes_transferred == 0 && ec == boost::asio::error::would_block) {
PRINT("No data available");
}
PRINT(ec.message() << "bytes received " << bytes_transferred);
}
int main() {
std::thread s([] {
bloom_filter db;
netcom_server(6767, db);
});
std::thread c([] {
for (int i = 0; i < 4; ++i) {
std::this_thread::sleep_for(std::chrono::seconds(1));
bloom_filter db;
netcom_client(6767, "127.0.0.1", db);
}
});
s.join();
c.join();
}
打印类似的东西
Connection Accepted : 127.0.0.1:38126
[false,1,{0x7e,0xeb},0x2a]
SEND DEBUG: "22 serialization::archive 17 0 0 0 1 126 235 42
"
Successbytes sent 49
Exit netcom_client
Successbytes received 49
RECV DEBUG: "22 serialization::archive 17 0 0 0 1 126 235 42
"
Received 1 elements at server
[false,1,{0x7e,0xeb},0x2a]
Close connection 127.0.0.1:38126
Connection Accepted : 127.0.0.1:38128
[false,1,{0x7e,0xeb},0x2a]
SEND DEBUG: "22 serialization::archive 17 0 0 0 1 126 235 42
"
Successbytes sent 49
Exit netcom_client
Successbytes received 49
RECV DEBUG: "22 serialization::archive 17 0 0 0 1 126 235 42
"
Received 1 elements at server
[false,1,{0x7e,0xeb},0x2a]
Close connection 127.0.0.1:38128
Connection Accepted : 127.0.0.1:38130
[false,1,{0x7e,0xeb},0x2a]
SEND DEBUG: "22 serialization::archive 17 0 0 0 1 126 235 42
"
Successbytes sent 49
Exit netcom_client
Successbytes received 49
RECV DEBUG: "22 serialization::archive 17 0 0 0 1 126 235 42
"
Received 1 elements at server
[false,1,{0x7e,0xeb},0x2a]
Close connection 127.0.0.1:38130
Connection Accepted : 127.0.0.1:38132
[false,1,{0x7e,0xeb},0x2a]
SEND DEBUG: "22 serialization::archive 17 0 0 0 1 126 235 42
"
Successbytes sent 49
Exit netcom_client
Successbytes received 49
RECV DEBUG: "22 serialization::archive 17 0 0 0 1 126 235 42
"
Received 1 elements at server
[false,1,{0x7e,0xeb},0x2a]
Close connection 127.0.0.1:38132
奖金
序列化bloom_filter
类型可能很容易。另外,请注意,只要elem
是 POD 类型,它就可以按位序列化,因此可以利用这一点:
此版本:
- 演示如何使用按位向量序列化切换到二进制存档
- 演示如何一次序列化整个数据库
- 演示如何禁止显示存档标头
- 将成帧移动到
getData
和sendData
,使分隔符依赖于存档类型(刚NUL
在二进制流中有效( - 大大简化了过程中的服务器/客户端代码
笔记:
- 二进制存档是不可移植的(您只能期望在相同的体系结构和版本之间共享(
- 成帧当前不保留使用
read_until
读取的任何剩余数据。这通常是TCP中可能发生的事情,但未给出显示的代码。只是需要注意的事情
住在科里鲁
#include <boost/archive/binary_iarchive.hpp>
#include <boost/archive/binary_oarchive.hpp>
#include <boost/archive/text_iarchive.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/asio.hpp>
#include <boost/serialization/serialization.hpp>
#include <boost/serialization/binary_object.hpp>
#include <boost/serialization/vector.hpp>
#include <iostream>
#include <iomanip>
#include <mutex>
namespace {
// very sumple hack to synchronize output to std::cout
#define PRINT(e) { std::lock_guard<std::mutex> lk(s_mx); std::cout << e << std::endl; }
static std::mutex s_mx;
}
namespace ba = boost::asio;
using ba::ip::tcp;
void sendData(tcp::socket& socket, std::string const& data);
std::string getData(tcp::socket& socket);
struct element {
bool bit = false;
int count = 0;
uint64_t hashSum[2] = {};
uint64_t idSum = 0;
friend std::ostream& operator<<(std::ostream& os, element const& e) {
std::ostringstream oss;
oss << std::showbase << std::boolalpha;
oss << "[" << e.bit << ","
<< std::dec << e.count << ","
<< std::hex << "{" << e.hashSum[0] << "," << e.hashSum[1] << "},"
<< e.idSum << "]";
return os << oss.str();
}
};
BOOST_IS_BITWISE_SERIALIZABLE(element)
namespace mocks {
struct bloom_filter {
bloom_filter(...) {}
size_t size() const { return mock_data.size(); }
void indexins_elem(element const& e, size_t i) {
mock_data.at(i) = e;
//// Danger: this grows the db while it is being iterated
//assert(i <= size());
//mock_data.insert(mock_data.begin() + i, e);
}
void show_elemvec() const {
for (auto& el : mock_data) PRINT(el);
}
void show_elem(size_t i) const { PRINT(mock_data.at(i)); }
element getelem(size_t i) { return mock_data.at(i); }
private:
std::vector<element> mock_data{
{ false, 1, { 0x7e, 0xeb }, 42 },
};
friend class boost::serialization::access;
template <typename Archive> void serialize(Archive& ar, unsigned) {
ar& mock_data;
}
};
}
#define USE_TEXT
#ifdef USE_TEXT
#define ARCHIVE_ text_
static const std::string DELIMITER("x00", 1);
struct safe_print {
std::string const& data;
friend std::ostream& operator<<(std::ostream& os, safe_print wrap) {
return os << std::quoted(wrap.data);
}
};
namespace boost { namespace serialization {
template <typename Ar>
void serialize(Ar& ar, element& e, unsigned) {
ar & e.bit & e.count & e.hashSum & e.idSum;
}
} }
#else
#define ARCHIVE_ binary_
static const std::string DELIMITER("x00xdexadxbexef", 5);
struct safe_print {
std::string const& data;
friend std::ostream& operator<<(std::ostream& os, safe_print wrap) {
std::ostringstream oss;
oss << std::hex << std::setfill('0');
for (uint8_t ch : wrap.data) {
oss << std::setw(2) << static_cast<int>(ch);
}
return os << oss.str();
}
};
#endif
template <typename T>
std::string archive(T const& data) {
std::stringstream os;
{
boost::archive::BOOST_PP_CAT(ARCHIVE_, oarchive) out_archive{ os, boost::archive::no_header };
out_archive << data;
}
return os.str();
}
template <typename T>
void restore(std::string const& text, T& into) {
std::istringstream is(text);
boost::archive::BOOST_PP_CAT(ARCHIVE_, iarchive) ia{is, boost::archive::no_header };
ia >> into;
}
using mocks::bloom_filter;
void netcom_server(unsigned short portn) {
ba::io_service io_service;
tcp::acceptor acceptor_server(io_service, {tcp::v4(), portn});
while (1) {
tcp::socket server_socket(io_service);
acceptor_server.accept(server_socket);
PRINT("Connection Accepted : " << server_socket.remote_endpoint());
bloom_filter recvd_bloomdb;
restore(getData(server_socket), recvd_bloomdb);
PRINT("Received " << recvd_bloomdb.size() << " elements at server");
recvd_bloomdb.show_elemvec();
}
}
void netcom_client(unsigned short portn, std::string serverip, bloom_filter& bloom_db) {
ba::io_service io_service;
tcp::socket client_socket(io_service);
client_socket.connect({ba::ip::address::from_string(serverip), portn});
sendData(client_socket, archive(bloom_db));
PRINT("Exit netcom_client");
}
void sendData(tcp::socket& socket, std::string const& data) {
PRINT("SEND DEBUG: " << safe_print{data});
std::vector<ba::const_buffers_1> frame {
ba::buffer(data),
ba::buffer(DELIMITER)
};
std::size_t bytes_transferred = boost::asio::write(socket, frame);
PRINT("bytes sent " << bytes_transferred);
}
std::string getData(tcp::socket& socket) {
std::string buffer;
std::size_t bytes_transferred = boost::asio::read_until(socket, ba::dynamic_buffer(buffer), DELIMITER);
PRINT("bytes received " << bytes_transferred);
buffer.resize(bytes_transferred);
PRINT("RECV DEBUG: " << safe_print{buffer});
return buffer;
}
int main() {
PRINT("For info: sizeof(element) = " << sizeof(element));
std::thread s([] {
bloom_filter db;
netcom_server(6767);
});
std::thread c([] {
for (int i = 0; i < 4; ++i) {
std::this_thread::sleep_for(std::chrono::seconds(1));
bloom_filter db;
netcom_client(6767, "127.0.0.1", db);
}
});
s.join();
c.join();
}
指纹
For info: sizeof(element) = 32
Connection Accepted : 127.0.0.1:38134
SEND DEBUG: "0 0 0 0 1 0 0 0 0 1 2 126 235 42
"
bytes sent 34
Exit netcom_client
bytes received 34
RECV DEBUG: "0 0 0 0 1 0 0 0 0 1 2 126 235 42
"
Received 1 elements at server
[false,1,{0x7e,0xeb},0x2a]
SEND DEBUG: "0 0 0 0 1 0 0 0 0 1 2 126 235 42
"
Connection Accepted : 127.0.0.1:38136
bytes received 34
RECV DEBUG: "0 0 0 0 1 0 0 0 0 1 2 126 235 42
"
bytes sent 34
Exit netcom_client
Received 1 elements at server
[false,1,{0x7e,0xeb},0x2a]
SEND DEBUG: "0 0 0 0 1 0 0 0 0 1 2 126 235 42
"
bytes sent 34
Exit netcom_client
Connection Accepted : 127.0.0.1:38138
bytes received 34
RECV DEBUG: "0 0 0 0 1 0 0 0 0 1 2 126 235 42
"
Received 1 elements at server
[false,1,{0x7e,0xeb},0x2a]
SEND DEBUG: "0 0 0 0 1 0 0 0 0 1 2 126 235 42
"
bytes sent 34
Exit netcom_client
Connection Accepted : 127.0.0.1:38140
bytes received 34
RECV DEBUG: "0 0 0 0 1 0 0 0 0 1 2 126 235 42
"
Received 1 elements at server
[false,1,{0x7e,0xeb},0x2a]