尝试使用 boost::序列化和 Arcive 实现对象时获取"input stream error"



我正试图使用boost::serialization,boost::Arciveboost::split members(加载和保存)在boost::message queue上发送一个类问题是当我试图反序列化时,我得到input stream error异常

#include <iostream>
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/version.hpp>
#include <random>

#include <boost/archive/text_oarchive.hpp>
#include <boost/archive/text_iarchive.hpp>
#include <boost/serialization/access.hpp>
#include <boost/archive/impl/basic_text_oarchive.ipp>
#include <boost/archive/impl/text_oarchive_impl.ipp>
#include <boost/archive/impl/text_iarchive_impl.ipp>
#include <boost/serialization/split_member.hpp>

class Data{ 
public:
int a_ ;   
double b_ ;
std::string s ;
template<class Archive>   
void serialize(Archive & ar, const unsigned int version)   {
boost::serialization::split_member(ar, *this, version); 
}
template<class Archive>   void save(Archive & ar, unsigned int version) const   {
// ar << order_request_type_;
ar << a_;
ar << b_;
ar << s;   }
template<class Archive>   void load(Archive & ar, unsigned int version)   {
// ar >> order_request_type_;
ar >> a_;
ar >> b_;
ar >> s;
}

private:
// friend class boost::archive::access;
friend class   boost::archive::save_access;
};

[[nodiscard]]bool RunChiled() { 
using namespace boost::interprocess; try {
message_queue mq(open_only   //open or create
, "message_queue"     //name
);
unsigned int priority = 0;
message_queue::size_type recvd_size;
Data d;
std::stringstream iss;
std::string serialized_string;
serialized_string.resize(150);
long long number = 0;
while(true)
{
mq.receive(&serialized_string[0], 150, recvd_size , priority);
std::cout << serialized_string << "n";
iss << serialized_string;
try{
boost::archive::text_iarchive ia(iss);  // <-- getting the exception
ia >> d;
} catch  (const std::exception& ex) {
std::cout << ex.what() << "n";
}
number++;
std::cout << d.a_ << " " << d.b_ << " " << d.s << "n";
}   }catch(const interprocess_exception &ex) {
message_queue::remove("message_queue");
std::cout << "interprocess_exception " << ex.what() << std::endl;
return 1;   } catch (const std::exception& e)   {
std::cout << "exception " << e.what() << std::endl;
message_queue::remove("message_queue");
return 1;   }
message_queue::remove("message_queue");   return true; }

int main() {   std::cout << "1n";   std::default_random_engine generator;   std::uniform_real_distribution<double> distribution(0,15);
using namespace std;
cout << "Boost version: " << BOOST_LIB_VERSION << endl;   using namespace boost::interprocess;   message_queue::remove("message_queue");   auto pid = fork();
if(pid > 0)   {
std::cout << "2n";
sleep(2);
try {
auto res = RunChiled();
std::cout << res;
} catch (...) {
std::cout << "errorn";
} 
} else if(pid == 0)   {
try{    
boost::interprocess::message_queue mq(create_only,"message_queue", 100, 150);
std::stringstream oss;
Data request;       
request.b_ = 17.5;      
request.a_ = I;         
request.s = to_string(17.5) + " " + to_string(i);       
try {           
boost::archive::text_oarchive oa(oss);
oa << request;
} catch (const std::exception& e) {
std::cout << "serialzation:" << e.what() ;
}
try{    
// std::cout << "oss " << oss.str() << "n";         
std::string serialized_string(oss.str());
std::cout << "serialized_string " << oss.str().size() << "n";                   
mq.send(&serialized_string, serialized_string.size(), 0); 
}catch(const std::exception& e){
std::cout << "n send     exeption " << e.what() << "n";
} 
}
}catch (const std::exception& e){             
message_queue::remove("message_queue");
std::cout << e.what() ;
}
}   
return 0;
}

一些重大问题。

首先
  1. mq.send(&serialized_string, serialized_string.size(), 0);
    

    这是未定义的行为,因为serialzed_string不是POD,尺寸也不匹配。你可能是指接收端:

    mq.send(serialized_string.data(), serialized_string.size(), 0);
    
  2. 您正在将serialized_string消息大小调整为150,并且永远不会回到实际的大小。这意味着它将不能正常工作会有尾随数据

    修复:

    std::string buffer(buffer_size, '');
    unsigned int priority = 0;
    message_queue::size_type recvd_size;
    mq.receive(buffer.data(), buffer.size(), recvd_size, priority);
    buffer.resize(recvd_size);
    

其他笔记不拆分序列化可以更简单:

class Data {
public:
int a_;
double b_;
std::string s;
template <class Archive> void serialize(Archive& ar, unsigned) {
ar & a_ & b_ & s;
}
private:
friend class boost::serialization::access; // not required
};

其他笔记

  • return 1return truebool runChiled看起来很可疑-其中一个可能是一个错误

  • [[nodiscard]]在通常不会返回的函数上似乎有点棘手([[noreturn]]可能更合适,可选地只是传递异常?)

  • 你会想要用一些真正随机的东西来播种你的随机生成器:

    std::default_random_engine generator { std::random_device{}() };
    

Live Demo With Fixes

Live On Coliru

Live On Wandbox

#include <iostream>
#include <iomanip>
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/date_time.hpp>
#include <boost/core/demangle.hpp>
#include <boost/version.hpp>
#include <random>
#include <thread> // this_thread
#include <boost/archive/text_oarchive.hpp>
#include <boost/archive/text_iarchive.hpp>
#include <boost/serialization/access.hpp>
#include <boost/serialization/split_member.hpp>
namespace {
namespace bip = boost::interprocess;
using bip::message_queue;
auto constexpr queuename = "message_queue";
auto constexpr max_queued = 5;
auto constexpr buffer_size = 150;
auto sleep_for = [](auto d) { std::this_thread::sleep_for(d); };
using namespace std::chrono_literals;
}
class Data {
public:
int a_{};
double b_{};
std::string s;
template <class Ar> void serialize(Ar& ar, unsigned /*unused*/) {
ar& a_& b_& s;
}
};
[[noreturn]] void RunChild()
{
std::cout << "2" << std::endl;
sleep_for(2s);
try {
message_queue mq(bip::open_only,queuename);
size_t number = 0;
for (std::string buffer(buffer_size, '');; buffer.resize(buffer_size)) {
{
unsigned int priority = 0;
message_queue::size_type recvd_size = 0;
#ifdef COLIRU
// make the process terminate for online compiler
{
using namespace boost::posix_time;
auto deadline = second_clock::universal_time() + seconds(3);
if (not mq.timed_receive(buffer.data(), buffer.size(),
recvd_size, priority, deadline))
{
throw std::runtime_error("no more messages");
}
}
#else
mq.receive(buffer.data(), buffer.size(), recvd_size, priority);
#endif
buffer.resize(recvd_size);
}
//std::cout << buffer << std::endl;
Data d;
try {
std::stringstream iss(buffer);
boost::archive::text_iarchive ia(iss);
ia >> d;
} catch (const std::exception& ex) {
std::cout << ex.what() << std::endl;
}
++number;
std::cout << "Received: " << d.a_ << " " << d.b_ << " "
<< std::quoted(d.s) << std::endl;
}
} catch (const std::exception& e) {
std::cout << boost::core::demangle(typeid(e).name()) << " " << e.what()
<< std::endl;
message_queue::remove(queuename);
throw; // re-raise
}
message_queue::remove(queuename);
}
static void RunParent()
{
std::cout << "1" << std::endl;
std::default_random_engine generator { std::random_device{}() };
std::uniform_real_distribution<double> distribution(0, 15);
message_queue mq(bip::create_only, queuename, max_queued, buffer_size);
for (auto i = 0; i < 10; ++i) {
auto value = distribution(generator);
Data const request {
i,
value,
std::to_string(value) + " " + std::to_string(i)
};
std::stringstream oss;
try {
boost::archive::text_oarchive oa(oss);
oa << request;
std::string buffer = std::move(oss).str();
std::cout << "Sending " << buffer.size() << " bytes" << std::endl;
mq.send(buffer.data(), buffer.size(), 0);
} catch (const std::exception& e) {
std::cout << "nsend exeption " << e.what() << std::endl;
}
}
}
int main() {
std::cout << "Boost version: " << BOOST_LIB_VERSION << std::endl;
message_queue::remove(queuename);
try {
if (auto pid = ::fork(); pid > 0) {
RunChild();
} else if (pid == 0) {
RunParent();
}
} catch (const std::exception& e) {
std::cout << e.what() << std::endl;
message_queue::remove(queuename);
}
}

打印

Boost version: 1_75
2
1
Sending 72 bytes
Sending 73 bytes
Sending 72 bytes
Sending 72 bytes
Sending 73 bytes
Sending 72 bytes
Sending 72 bytes
Received: 0 1.12642 "1.126420 0"
Received: 1 14.2474 "14.247412 1"
Received: 2 3.22163 "3.221631 2"
Sending 73 bytes
Sending 72 bytes
Received: 3 3.20471 "3.204709 3"
Sending 72 bytes
Received: 4 10.7838 "10.783761 4"
Received: 5 5.74063 "5.740629 5"
Received: 6 6.98008 "6.980078 6"
Received: 7 11.6643 "11.664257 7"
Received: 8 3.80561 "3.805614 8"
Received: 9 7.79641 "7.796408 9"
std::runtime_error no more messages
no more messages

最新更新